Skip to main content

Method list

Preparation and requirements

For prerequisites and requirements to use the Pub/Sub Go SDK, refer to the SDK overview documentation.

Topic

Retrieve topic

You can retrieve a topic using the methods below.

Method
TypeMethodDescription
ClientTopic(name string) *TopicReturns a topic object regardless of existence; allows configuration such as Publish Setting
TopicExists(ctx context.Context) (bool, error)Checks from the server whether the topic actually exists
TopicConfig(ctx context.Context) (*TopicConfig, error)Retrieves information about the topic
Parameters

No parameters.

Example
func (c *Client) Topic(name string) *Topic
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)

tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)

return nil
}

Retrieve topic list

Retrieves the list of topics in a project.

Method
TypeMethod
ClientTopics(ctx context.Context) *TopicIterator
Parameters

No parameters.

Example
func (c *Client) Topics(ctx context.Context) *TopicIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}

Retrieve subscription list of topic

Retrieves the list of subscriptions for a specific topic.

Method
TypeMethod
TopicSubscriptions(ctx context.Context) *SubscriptionIterator
Parameters

No parameters.

Example
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}

Modify topic

You can update the retention period and description of a specific topic.

Method
TypeMethod
TopicUpdate(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error)
Parameters
TypeFieldDescription
TopicConfigToUpdatecfgObject containing information to update the topic

pubsub.TopicConfigToUpdate

TypeFieldDescription
time.DurationRetentionDurationMessage retention period
- Default: 604,800s (7 days)
stringDescriptionTopic description
Example
func (t *Topic) Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

cfg := &pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}

Create topic

You can create a new topic using the methods below.

Method
TypeMethodDescription
ClientCreateTopic(ctx context.Context, name string) (*Topic, error)Retention period is set to default (7 days)
ClientCreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error)Allows specifying topic configuration
Parameters
TypeFieldDescription
TopicConfigcfgObject containing information about the topic to be created

pubsub.TopicConfig

TypeFieldDescription
time.DurationRetentionDurationMessage retention period
- Default: 604,800s (7 days)
stringDescriptionTopic description
Example
func (c *Client) CreateTopic(ctx context.Context, name string) (*Topic, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicCreate(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic, err := client.CreateTopic(ctx, name)
if err != nil {
return fmt.Errorf("CreateTopic: %v", err)
}

fmt.Printf("Topic created: %v\n", topic)
return nil
}
func (c *Client) CreateTopicWithConfig(ctx context.Context, name string, tc *TopicConfig) (*Topic, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

config := pubsub.TopicConfig{
RetentionDuration: duration,
Description: "topic description",
}
topic, err := client.CreateTopicWithConfig(ctx, name, &config)
if err != nil {
return fmt.Errorf("CreateTopicWithConfig: %v", err)
}

fmt.Printf("Topic created: %v\n", topic)
return nil
}

Delete topic

You can delete a topic using the method below.

caution

When a topic is deleted, all associated subscriptions are also deleted.

Method
TypeMethod
TopicDelete(ctx context.Context) error
Parameters

No parameters.

Example
func (t *Topic) Delete(ctx context.Context) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicDelete(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

err = topic.Delete(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
return nil
}

Subscription

Retrieve subscription

Check whether a subscription exists.

Method
TypeMethodDescription
ClientSubscription(name string) *SubscriptionReturns a subscription object regardless of existence
- Enables configuration such as ReceiveSettings
SubscriptionExists(ctx context.Context) (bool, error)Checks from the server whether the subscription actually exists
SubscriptionConfig(ctx context.Context) (*SubscriptionConfig, error)Retrieves information about the subscription
Parameters

No parameters.

Example
func (c *Client) Subscription(name string) *Subscription
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)

sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}

Retrieve subscription list

Retrieves the list of subscriptions belonging to the domain and project set in the client.

Method
TypeMethodDescription
ClientSubscriptions(ctx context.Context) *SubscriptionIteratorReturns an iterator for retrieving actual subscription info
Parameters

No parameters.

Example
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Subscriptions(ctx)
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}

Modify subscription

Updates a subscription. At least one field must be provided for modification.

Method
TypeMethodDescription
SubscriptionUpdate(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResultUpdates the subscription based on SubscriptionConfigToUpdate values
Parameters
TypeFieldDescription
SubscriptionConfigToUpdatecfgConfiguration values for the update

pubsub.SubscriptionConfigToUpdate

TypeFieldDescription
time.DurationAckDeadlineAcknowledgment deadline
time.DurationRetentionDurationMessage retention duration
- Default: 604,800s (7 days)
intMaxDeliveryAttemptMax delivery attempts
- Min: 1, Max: infinite (1~100)
- Default: -1
PushConfigPushConfigConfiguration for Push subscription
ObjectStorageConfigObjectStorageConfigConfiguration for Object Storage subscription

pubsub.PushConfig

TypeFieldDescription
stringEndpointTarget endpoint to deliver messages
intBatchSizeNumber of messages sent per batch
- Default: 1

pubsub.ObjectStorageConfig

TypeFieldDescription
stringBucketName of Object Storage bucket
intExportIntervalMinutesExport interval to Object Storage
- Default: 5
stringFilePrefixPrefix for exported file
stringFileSuffixSuffix for exported file

pubsub.SubscriptionResult

MethodDescription
Get(ctx context.Context) (string, error)Retrieves the final status of the update request (blocks until complete).
⚠️ Enabled only with option.WithWaitStatus()
Name() stringReturns the name of the requested subscription
Example
func (s *Subscription) Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
"time"
)

func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
updateConfig := &pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}

result := subscription.Update(ctx, updateConfig, option.WithWaitStatus())

status, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)

return nil
}

Rewind subscription time

Rewind a subscription to a specified point in time and retrieve all messages published after that time.

Method
TypeMethodDescription
SubscriptionSeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResultRewinds the subscription to the specified time
Parameters
TypeFieldDescription
time.TimetTime to which the subscription should be rewound

pubsub.SubscriptionResult

TypeMethodDescription
SubscriptionResultGet(ctx context.Context) (string, error)Returns the final state of the subscription request (blocking until complete).
⚠️ Requires option.WithWaitStatus()
SubscriptionResultName() stringReturns the name of the requested subscription
Example
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
"time"
)

func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}

Create subscription

You can create a subscription using the following method.

Method
TypeMethodDescription
ClientCreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResultCreates a subscription (default message retention: 7 days)
Parameters
TypeFieldDescription
SubscriptionConfigcfgObject containing subscription settings

pubsub.SubscriptionConfig

TypeFieldDescription
stringTopicTopic name
time.DurationAckDeadlineAcknowledgment deadline
time.DurationRetentionDurationMessage retention duration
- Default: 604,800s (7 days)
PushConfigPushConfigSettings for push subscription
ObjectStorageConfigObjectStorageConfigSettings for object storage subscription
intMaxDeliveryAttemptMax delivery attempts
- Default: 1 (unlimited)

pubsub.PushConfig

TypeFieldDescription
stringEndpointDestination endpoint for messages
intBatchSizeNumber of messages per batch
- Default: 1

pubsub.ObjectStorageConfig

TypeFieldDescription
stringBucketObject Storage bucket name
intExportIntervalMinutesExport interval (min: 1, max: 10, default: 5)

⚠️ At least one file is created per interval. Files may also be created before interval ends
stringFilePrefixFile prefix
stringFileSuffixFile suffix

pubsub.SubscriptionResult

TypeMethodDescription
SubscriptionResultGet(ctx context.Context) (string, error)Returns the final state of the subscription request (blocking)
⚠️ Requires option.WithWaitStatus()
SubscriptionResultName() stringName of the created subscription
Example
func (c *Client) CreateSubscription(ctx context.Context, name string, cfg SubscriptionConfig, opts ...option.SubscriptionOption) (*SubscriptionResult)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
"time"
)

func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

pullSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
}

pushSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:9999/push",
BatchSize: 1,
},
}

objectStorageSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
RetentionDuration: 24 * time.Hour,
ObjectStorageConfig: &pubsub.ObjectStorageConfig{
Bucket: "bucket",
ExportIntervalMinutes: 2,
FilePrefix: "prefix",
FileSuffix: "suffix",
},
}

result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())
status, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)

result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())
status, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)

result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())
status, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)

return nil
}

Delete subscription

Delete a subscription using the method below.

Method
TypeMethodDescription
SubscriptionDelete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResultDeletes subscription
Parameters

pubsub.SubscriptionResult

TypeMethodDescription
SubscriptionResultGet(ctx context.Context) (string, error)Returns final subscription status (blocking).
⚠️ Requires option.WithWaitStatus()
SubscriptionResultName() stringName of the subscription request
Example
func (s *Subscription) Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
)

func subscriptionDelete(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
result := subscription.Delete(ctx, option.WithWaitStatus())

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
fmt.Printf("Subscription deleted %s\n", result.Name())
return nil
}

Message

Publish message

You can publish messages to a topic.

Method
TypeMethod
PublishResultPublish(ctx context.Context, msg *Message)
Parameters
TypeFieldDescription
MessagemsgMessage to be published

pubsub.Message

TypeFieldDescription
stringDataMessage body
⚠️ Must be Base64-encoded
map[string]stringAttributesMessage attributes in key-value map format
⚠️ Keys starting with kakaoc are not allowed

pubsub.PublishSettings

TypeFieldDescription
time.DurationDelayThresholdSend batched messages to server at this interval
- Default: 100ms
intCountThresholdBatch this many messages before sending
- Default: 100
intNumGoroutinesMax number of goroutines
- Default: 2, Max: 10
time.DurationTimeoutTimeout for server requests
- Default: 60s
FlowControlSettingsFlowControlSettingsControls message flow to prevent overload

pubsub.FlowControlSettings

TypeFieldDescription
intMaxOutstandingMessagesMax number of unprocessed messages
- Default: 1000
intMaxOutstandingBytesMax size of unprocessed messages
- Default: 1G
LimitExceededBehaviorLimitExceededBehaviorBehavior when limit exceeded:
- ignore : No flow control
- block (default): Wait
- signal error: Return error

pubsub.PublishResult

TypeMethodDescription
stringGet(ctx context.Context)Retrieves message ID from server
- Blocks until message is published
Example
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
import (
"context"
"encoding/base64"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
Attributes: attributes,
})

id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}

Receive message asynchronously

Receive subscription messages asynchronously.

caution

Available only for subscriptions of type Pull.

Method
TypeMethodDescription
SubscriptionReceive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) errorReceives messages via streaming and executes callback function
Parameters
TypeFieldDescription
func(ctx2 context.Context, message *Message)callbackFuncCallback function triggered per message

pubsub.ReceiveSettings

TypeFieldDescription
intNumGoroutinesMax number of concurrent goroutines
- Default: 2, Max: 10
FlowControlSettingsFlowControlSettingsControls message flow

pubsub.FlowControlSettings

TypeFieldDescription
intMaxOutstandingMessagesMax number of unacknowledged messages
- Default: 1000
intMaxOutstandingBytesMax size of unacknowledged messages
- Default: 1G
LimitExceededBehaviorLimitExceededBehaviorBehavior when limits exceeded
- ignore: No flow control
- block (default): Wait
- signal error: Return error

pubsub.Message

TypeFunctionDescription
stringIDUnique message ID
stringDataMessage content
map[string]stringAttributesMessage attributes
time.TimePublishTimePublish timestamp
*intDeliveryAttemptDelivery attempt count
Example
func (s *Subscription) Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
return subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
}
With AckWithResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
return subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) // blocks until acknowledge is complete
if err != nil {
fmt.Printf("Ack error: %v\n", err)
}
fmt.Printf("Ack status: %s\n", status)
})
}

Receive message synchronously

Receive messages from a subscription synchronously.

Method
TypeMethodDescription
SubscriptionPull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)Retrieves messages in batch from subscription synchronously
Parameters
TypeFieldDescription
intmaxMessagesNumber of messages to pull in a batch

pubsub.PulledMessage

Read-only version of pubsub.Message.

TypeFunctionDescription
stringIDMessage ID
stringDataMessage content
map[string]stringAttributesMessage attributes
time.TimePublishTimeTime when published
*intDeliveryAttemptNumber of deliveries
Example
func (s *Subscription) Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
if err != nil {
return fmt.Errorf("subscription.Pull: %v", err)
}
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}

Acknowledge message reception

Send acknowledgment for received messages using their AckIDs.

Method
TypeMethodDescription
SubscriptionAcknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)Informs the system that the messages have been handled
Parameters
TypeFieldDescription
[]stringackIDsAckIDs list

pubsub.AcknowledgeResponse

TypeFieldDescription
[]*AcknowledgeFailureResultFailureFailed acknowledgment

pubsub.AcknowledgeFailureResult

TypeFieldDescription
stringAckIDFailed AckID
errorErrorReason for failure
Example
func (s *Subscription) Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

resp, err := subscription.Acknowledge(ctx, ackIDs)
if err != nil {
return fmt.Errorf("Acknowledge: %v", err)
}
if resp != nil {
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
}
return nil
}

Modify message ack deadline

Change the message acknowledgment deadline.

  • Extend: Extends ack deadline by the subscription’s configured AckDeadline.
  • Skip: Sets ack deadline to zero, making the message immediately available for redelivery.
Method
TypeMethodDescription
SubscriptionModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error)Modifies AckDeadline based on action specified
Parameters
TypeFieldDescription
[]stringackIDsAckIDs of the messages
AckActionactionpubsub.ActionExtend or pubsub.ActionSkip

pubsub.ModifyAckDeadlineResponse

TypeFieldDescription
[]*ModifyAckDeadlineSuccessResultSuccessSuccessful results
[]*ModifyAckDeadlineFailureResultFailureFailed results

pubsub.ModifyAckDeadlineSuccessResult

TypeFieldDescription
stringAckIDOriginal AckID
stringReissuedAckIDNew AckID after extend

pubsub.ModifyAckDeadlineFailureResult

TypeFieldDescription
stringAckIDFailed AckID
errorErrorReason for failure
Example
func (s *Subscription) ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

// Skip example
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline (skip): %v", err)
}

// Extend example
resp, err = subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline (extend): %v", err)
}
return nil
}