Skip to main content

Method list

Preparation and requirements

For preparation and requirements to use the Pub/Sub Go SDK, refer to the SDK Overview document.

Topic

Create topic

You can create a new topic using the following items.

Method
TypeMethodDescription
ClientCreateTopic(ctx context.Context, name string) (*Topic, error)The message retention period is set to the default value (7 days).
ClientCreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error)
Parameters
TypeMethodDescription
TopicConfigcfgAn object containing information about the topic to be created.

pubsub.TopicConfig

TypeFieldDescription
time.DurationRetentionDurationMessage retention period
- Default: 604,800s (7 days).
stringDescriptionTopic description.
Example
func (c *Client) CreateTopic(ctx context.Context, name string) (*Topic, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicCreate(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic, err := client.CreateTopic(ctx, name)
if err != nil {
return fmt.Errorf("CreateTopic: %v", err)
}

fmt.Printf("Topic created: %v\n", topic)
return nil
}
func (c *Client) CreateTopicWithConfig(ctx context.Context, name string, tc *TopicConfig) (*Topic, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

config := pubsub.TopicConfig{
RetentionDuration: duration,
Description: "topic description",
}
topic, err := client.CreateTopicWithConfig(ctx, name, &config)
if err != nil {
return fmt.Errorf("CreateTopicWithConfig: %v", err)
}

fmt.Printf("Topic created: %v\n", topic)
return nil
}

Retrieve topic

You can retrieve a topic using the following items.

Method
TypeMethodDescription
ClientTopic(name string) *TopicReturns a topic object regardless of its actual existence, allowing you to configure settings such as publish settings.
TopicExists(ctx context.Context) (bool, error)Checks the server to determine if the topic actually exists.
TopicConfig(ctx context.Context) (*TopicConfig, error)Retrieves information about the topic.
Parameters

There are no parameters.

Example
func (c *Client) Topic(name string) *Topic
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)

tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)

return nil
}

Retrieve topic list

Retrieves the list of topics within the project.

Method
TypeMethod
ClientTopics(ctx context.Context) *TopicIterator
Parameters

There are no parameters.

Example
func (c *Client) Topics(ctx context.Context) *TopicIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}

토픽의 서브스크립션 목록 조회

특정 토픽의 서브스크립션 목록을 조회합니다.

Method
TypeMethod
TopicSubscriptions(ctx context.Context) *SubscriptionIterator
Parameters

There are no parameters.

Example
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}

Modify topic

Allows modification of the retention period and description of a specific topic.

Method
TypeMethod
TopicUpdate(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error)
Parameters
TypeFieldDescription
TopicConfigToUpdatecfgAn object containing information about the topic to be modified.

pubsub.TopicConfigToUpdate

TypeFieldDescription
time.DurationRetentionDurationMessage retention period
- Default: 604,800s (7 days).
stringDescriptionTopic descriptio
Example
func (t *Topic) Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

cfg := &pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}

Delete topic

You can delete a topic using the following items.

caution

When a topic is deleted, its associated subscriptions are also deleted.

Method
TypeMethod
TopicDelete(ctx context.Context) error
Parameters

There are no parameters.

Example
func (t *Topic) Delete(ctx context.Context) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicDelete(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

err = topic.Delete(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
return nil
}

Subscription

Create subscription

You can create a subscription using the following items.

Method
TypeMethodDescription
ClientCreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResultThe message retention period is set to the default value (7 days).
Parameters
TypeFieldDescription
SubscriptionConfigcfgAn object containing information about the subscription to be created.

pubsub.SubscriptionConfig

TypeFieldDescription
stringTopicTopic name.
time.DurationAckDeadlineMessage acknowledgment waiting time.
time.DurationRetentionDurationMessage retention period
- Default: 604,800s (7 days).
PushConfigPushConfigSettings for PushSubscription.
ObjectStorageConfigObjectStorageConfigSettings for ObjectStorageSubscription.
intMaxDeliveryAttemptNumber of retry attempts
- Default: 1 (infinite).

pubsub.PushConfig

TypeFieldDescription
stringEndpointDestination endpoint for sending messages.
intBatchSizeNumber of messages to send at once
- Default: 1.

pubsub.ObjectStorageConfig

TypeFieldDescription
stringBucketObject Storage bucket name.
intExportIntervalMinutesFrequency for storing to Object Storage
- Min: 1 min
- Max: 10 min
- Default: 5 min.

⚠️ At least one file is created during the interval, but files may be created before the interval ends.
stringFilePrefixPrefix for files stored in Object Storage.
stringFileSuffixSuffix for files stored in Object Storage.

pubsub.SubscriptionResult

TypeFieldDescription
SubscriptionResultGet(ctx context.Context) (string, error)Retrieves the state of the completed subscription request and blocks until the request is complete.
⚠️ Activated when the option.WithWaitStatus() option is set.
SubscriptionResultName() stringName of the requested subscription.
Example
func (c *Client) CreateSubscription(ctx context.Context, name string, cfg SubscriptionConfig, opts ...option.SubscriptionOption) (*SubscriptionResult)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

pullSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
}

pushSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:9999/push",
BatchSize: 1,
},
}

objectStorageSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
RetentionDuration: 24 * time.Hour,
ObjectStorageConfig: &pubsub.ObjectStorageConfig{
Bucket: "bucket",
ExportIntervalMinutes: 2,
FilePrefix: "prefix",
FileSuffix: "suffix",
},
}
result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())

success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success :%s\n", result.Name(), status)

result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())

success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)

result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())

success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
return nil
}

Retrieve subscription

Checks the existence of a subscription.

Method
TypeMethodDescription
ClientSubscription(name string) *SubscriptionReturns a subscription object regardless of its actual existence.
- Allows configuration of settings such as ReceiveSettings.
SubscriptionExists(ctx context.Context) (bool, error)Queries the server to check if the subscription actually exists.
SubscriptionConfig(ctx context.Context) (*SubscriptionConfig, error)Retrieves information about the subscription.
Parameters

There are no parameters.

Example
func (c *Client) Subscription(name string) *Subscription
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)

sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}

Retrieve subscription list

Retrieves the list of subscriptions belonging to the domain and project configured in the client.

Method
TypeMethodDescription
ClientSubscriptions(ctx context.Context) *SubscriptionIteratorReturns an iterator that retrieves actual subscription information.
Parameters

There are no parameters.

Example
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}

Modify subscription

Modifies a subscription. At least one item must be included for modification.

NameDescription
AckDeadlineMust differ from the existing value.
- Min: 10 sec
- Max: 600 sec
MessageRetentionDurationMust differ from the existing value.
- Min: 600 sec
- Max: 604800 sec (7 days)
MaxDeliveryAttemptNumber of retry attempts.
- Min: 1
- Max: Infinite (values between 1–100 can be specified).
- Default: -1
PushConfig.EndpointRequired only for PushSubscriptions and must differ from the existing value.
PushConfig.BatchSizeRequired only for PushSubscriptions and must differ from the existing value.
- Default: 1 (unlimited).
ObjectStorageConfig.BucketRequired only for ObjectStorageSubscriptions.
ObjectStorageConfig.ExportIntervalMinutes- Min: 1 min
- Max: 10 min
- Default: 5 min
ObjectStorageConfig.FilePrefixFor ObjectStorageSubscriptions, must differ from the existing value.
ObjectStorageConfig.FileSuffixFor ObjectStorageSubscriptions, must differ from the existing value.
Method
TypeMethodDescription
SubscriptionUpdate(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResultModifies the subscription based on the SubscriptionConfigToUpdate values.
Parameters
TypeFieldDescription
SubscriptionConfigToUpdatecfgConfiguration values required for modifying a subscription.

pubsub.SubscriptionConfigToUpdate

TypeFieldDescription
time.DurationAckDeadlineMessage acknowledgment waiting time.
time.DurationRetentionDurationMessage retention period
- Default: 604,800s (7 days).
intMaxDeliveryAttemptNumber of retry attempts
- Min: 1
- Max: Infinite (values between 1–100 can be specified).
- Default: -1.
PushConfigPushConfigSettings for PushSubscription.
ObjectStorageConfigObjectStorageConfigSettings for ObjectStorageSubscription.

pubsub.PushConfig

TypeFieldDescription
stringEndpointEndpoint for sending messages.
intBatchSizeNumber of messages to send at once
- Default: 1.

pubsub.ObjectStorageConfig

TypeFieldDescription
stringBucketObject Storage bucket name.
intExportIntervalMinutesFrequency for storing to Object Storage
- Default: 5.
stringFilePrefixPrefix for files stored in Object Storage.
stringFileSuffixSuffix for files stored in Object Storage.

pubsub.SubscriptionResult

TypeFieldDescription
SubscriptionResultGet(ctx context.Context) (string, error)Retrieves the state of the completed subscription request and blocks until the request is complete.
⚠️ Activated when the option.WithWaitStatus() option is set.
SubscriptionResultName() stringName of the requested subscription.
Example
func (s *Subscription) Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
updateConfig := &pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
result := subscription.Update(context.Background(), updateConfig, option.WithWaitStatus())

success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)

return nil
}

Delete subscription

You can delete a subscription using the following items.

Method
TypeMethodDescription
SubscriptionDelete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResultDeletes the subscription.
Parameters

pubsub.SubscriptionResult

TypeFieldDescription
SubscriptionResultGet(ctx context.Context) (string, error)Retrieves the state of the completed subscription request and blocks until the request is complete.
⚠️ Activated when the option.WithWaitStatus() option is set.
SubscriptionResultName() stringName of the requested subscription.
Example
func (s *Subscription) Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionDelete(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
result := subscription.Delete(ctx, option.WithWaitStatus())

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
fmt.Printf("Subscription deleted %s\n", result.Name())
return nil
}

Rewind subscription time

Rewinds the subscription to a specified point in time, allowing messages received after that time to be processed again.

Method
TypeMethodDescription
SubscriptionSeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResultRewinds the subscription time.
Parameters
TypeFieldDescription
time.TimetThe point in time to which the subscription should be rewound.

pubsub.SubscriptionResult

TypeFieldDescription
SubscriptionResultGet(ctx context.Context) (string, error)Retrieves the state of the completed subscription request and blocks until the request is complete.
⚠️ Activated when the option.WithWaitStatus() option is set.
SubscriptionResultName() stringName of the requested subscription.
Example
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}

Message

Publish message

Allows publishing a message to a topic.

Method
TypeMethod
PublishResultPublish(ctx context.Context, msg *Message)
Parameters
TypeFieldDescription
MessagemsgThe message to be published.

pubsub.Message

TypeFieldDescription
stringDataMessage body.
⚠️ Must be Base64-encoded.
map[string]stringAttributesMessage attributes in a [key, value] map format.
⚠️ Keys starting with kakaoc are not allowed.

pubsub.PublishSettings

TypeFieldDescription
time.DurationDelayThresholdSends publish requests to the server at the specified interval.
- Default: 100ms.
intCountThresholdCollects and sends messages in batches of the specified size.
- Default: 100.
intNumGoroutinesLimits the number of concurrently running Go routines.
- Default: 2.
- Max: 10.
time.DurationTimeoutSets the timeout for server requests.
- Default: 60s.
FlowControlSettingsFlowControlSettingsControls the publishing rate through flow control.

pubsub.FlowControlSettings

TypeFieldDescription
intMaxOutstandingMessagesSpecifies the maximum number of messages pending publication.
- Default: 1000.
intMaxOutstandingBytesSpecifies the maximum size of messages pending publication.
- Default: 1G.
LimitExceededBehaviorLimitExceededBehaviorDefines behavior when the limit is exceeded.
- ignore: Disables flow control.
- block (default): Waits until requests can be made without exceeding the limit (useful for batching processing where latency of individual requests is not critical).
- signal error: Returns an error.

pubsub.PublishResult

TypeFunctionDescription
stringGet(ctx context.Context)Retrieves the message id of the message published to the server.
- Blocks until the message is published.
Example
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}

Receive message asynchronously

Receives messages from a subscription asynchronously.

caution

This can only be used for pull-type subscriptions.

Method
TypeMethodDescription
SubscriptionReceive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) errorStreams messages from the subscription and executes the callback function.
Parameters
TypeFieldDescription
func(ctx2 context.Context, message *Message)callbackFuncCallback function to execute after receiving a message.

pubsub.ReceiveSettings

TypeFieldDescription
intNumGoroutinesLimits the number of concurrently running Goroutines.
- Default: 2.
- Max: 10.
FlowControlSettingsFlowControlSettingsControls the publishing rate through flow control.

pubsub.FlowControlSettings

TypeFieldDescription
intMaxOutstandingMessagesSpecifies the maximum number of messages pending publication.
- Default: 1000.
intMaxOutstandingBytesSpecifies the maximum size of messages pending publication.
- Default: 1G.
LimitExceededBehaviorLimitExceededBehaviorDefines behavior when the limit is exceeded.
- ignore: Disables flow control.
- block (default): Waits until requests can be made without exceeding the limit (useful for batching processing where latency of individual requests is not critical).
- signal error: Returns an error.

pubsub.Message

TypeFunctionDescription
stringIDUnique ID of the message.
stringDataMessage body.
map[string]stringAttributesMessage attributes.
time.TimePublishTimeTime when the message was published.
*intDeliveryAttemptNumber of times the message has been received.
Example
func (s *Subscription) Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}

Receive message synchronously

Receives messages from a subscription synchronously.

Method
TypeMethodDescription
SubscriptionPull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)Consumes messages from the subscription synchronously.
- Supports batch message reception.
Parameters
TypeFieldDescription
intmaxMessagesBatch size.
- Number of messages to receive at once.

pubsub.PulledMessage

Identical to pubsub.Message, but read-only.

TypeFunctionDescription
stringIDUnique ID of the message.
stringDataMessage body.
map[string]stringAttributesMessage attributes.
time.TimePublishTimeTime when the message was published.
*intDeliveryAttemptNumber of times the message has been received.
Example
func (s *Subscription) Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}

Acknowledge message reception

Marks retrieved messages as acknowledged.

Method
TypeMethodDescription
SubscriptionAcknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)Notifies the subscription that messages have been received using their AckIDs.
Parameters
TypeFieldDescription
[]stringackIDsAckIDs of the messages.

pubsub.AcknowledgeResponse

TypeFieldDescription
[]*AcknowledgeFailureResultFailureInformation about failed requests.

pubsub.AcknowledgeFailureResult

TypeFieldDescription
stringAckIDThe AckID of the failed request.
errorErrorReason for the failure.
Example
func (s *Subscription) Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

resp, err := subscription.Acknowledge(ctx, ackIDs)
if err != nil {
// handle error
}
if resp != nil {
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
}
return nil
}

Modify message acknowledgment deadline


Modifies the acknowledgment deadline (Ack Deadline) for a message.

  • Extend: Extends the message's usage time from the current point by the subscription's AckDeadline duration.
  • Skip: Sets the message's usage time to 0, allowing other clients to receive the message.
Method
TypeMethodDescription
SubscriptionModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction)
(*ModifyAckDeadlineResponse, error)
Extends or reduces the usage period of a message using its AckID.
Parameters
TypeFieldDescription
[]stringackIDsAckIDs of the messages.
AckActionactionIndicates whether to extend the AckDeadline.
pubsub.ActionExtend, pubsub.ActionSkip.

pubsub.ModifyAckDeadlineResponse

TypeFieldDescription
[]*ModifyAckDeadlineSuccessResultSuccessInformation about successful requests.
[]*ModifyAckDeadlineFailureResultFailureInformation about failed requests.

pubsub.ModifyAckDeadlineSuccessResult

TypeFieldDescription
stringAckIDThe original AckID.
stringReissuedAckIDA newly issued AckID after extension.

pubsub.ModifyAckDeadlineFailureResult

TypeFieldDescription
stringAckIDThe AckID of the failed request.
errorErrorReason for the failure.
Example
func (s *Subscription) ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

// Skip의 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend의 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}