Skip to main content

Method list

info

Topics and subscriptions can only be created/deleted in the KakaoCloud Console.

Preparation and requirements

For pre-work and requirements for using the Pub/Sub Go SDK, please refer to the Overview.

Topic

Retrieve topic

You can retrieve topics using the items below.

Method
TypeMethodDescription
TopicTopic(name string)Returns a topic object regardless of its actual existence, and can be configured with publish settings, etc.
boolExists(ctx context.Context)Checks on the server to see if the actual topic exists.
TopicConfigConfig(ctx context.Context)Checks topic information
Parameters

There are no parameters.

Example
func (c *Client) Topic(name string) *Topic
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)

tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)

return nil
}

Retrieve topic list

Retrieve the topic list within the project.

Method
TypeMethod
TopicIteratorTopics(ctx context.Context)
Parameters

There are no parameters.

Example
func (c *Client) Topics(ctx context.Context) *TopicIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}

Retrieve subscription list of topic

Retrieve the subscription list for a specific topic.

Method
TypeMethod
SubscriptionIteratorSubscriptions(ctx context.Context)
Parameters

There are no parameters.

Example
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}

Modify topic

You can modify the retention period and description of a specific topic.

Method
TypeMethod
TopicConfigUpdate(ctx context.Context, cfg TopicConfigToUpdate)
Parameters
TypeFieldDescription
TopicConfigToUpdatecfgAn object containing information about the topic to be modified

pubsub.TopicConfigToUpdate

TypeFieldDescription
time.DurationRetentionDurationMessage retention period
- Default: 604,800s(7 days)
stringDescriptionTopic description
Example
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

cfg := pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}

Subscription

Retrieve subscription

You can check whether a subscription exists.

Method
TypeMethodDescription
ClientSubscription(name string) *SubscriptionReturns a Subscription object regardless of its actual existence
- You can configure ReceiveSettings, etc.
SubscriptionExists(ctx context.Context) (bool, error)Checks the server for the existence of an actual subscription
SubscriptionConfig(ctx context.Context) (*SubscriptionConfig, error)Check subscription information
Parameters

There are no parameters.

Example
func (c *Client) Subscription(name string) *Subscription
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)

sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}

Retrieve subscription list

Retrieve the subscription list belonging to a specific domain and project set on the client.

Method
TypeMethodDescription
ClientSubscriptions(ctx context.Context) *SubscriptionIteratorReturns an iterator that can retrieve actual subscription information
Parameters

There are no parameters.

Example
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}

Modify subscription

Modify a specific subscription. When modifying, you must include at least one item.

NameDescription
AckDeadlineMust be different from the existing settings
- Min: 10 sec
- Max: 600 sec
MessageRetentionDurationMust be different from the existing settings
- Min: 600 sec
- Max: 604800 sec (7 days)
MaxDeliveryAttemptNumber of reprocessing attempts
- Min: 1
- Max: infinite(-1)
PushConfig.EndpointFor 'Push' subscription, it must be different from the existing settings
PushConfig.BatchSizeFor 'Push' subscription, it must be different from the existing settings
Method
TypeMethodDescription
SubscriptionUpdate(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error)Modifies subscription according to SubscriptionConfigToUpdate value
Parameters
TypeFieldDescription
SubscriptionConfigToUpdatecfgSetting values ​​required to modify subscription

pubsub.SubscriptionConfigToUpdate

TypeFieldDescription
time.DurationAckDeadlineMessage response waiting time
time.DurationRetentionDurationMessage retention period
- Default: 604,800s(7 days)
intMaxDeliveryAttemptNumber of reprocessing attempts
- Min: 1
- Max: Infinite
- Default: -1 (Infinite)
*PushConfigPushConfigValues configured for Push subscription

pubsub.PushConfig

TypeFieldDescription
stringEndpointEndpoint of the destination to send the message to
intBatchSizeNumber of messages to send at a time
- Default: 1
Example
func (s *Subscription) Update(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
updateConfig := pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost/push:8888",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
updatedPushSubConfig, err := subscription.Update(context.Background(), &updateConfig)
fmt.Printf("Subscription updated config : %+v\n", updatedPushSubConfig)

return nil
}

Rewind subscription time

You can rewind the subscription time and receive messages again after the specified time.

Method
TypeMethodDescription
SubscriptionSeekToTime(ctx context.Context, t time.Time) errorRewind subscription time
Parameters
TypeFieldDescription
time.TimetTime to rewind the subscription time
Example
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
err = subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour))
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}

Message

Publish message

You can publish a message to a topic.

Method
TypeMethod
PublishResultPublish(ctx context.Context, msg *Message)
Parameters
TypeFieldDescription
MessagemsgMessage to publish

pubsub.Message

TypeFieldDescription
stringDataMessage body
⚠️ Must be encoded in Base64.
map[string]stringAttributesMessage attributes
map in the format of [key, value]
⚠️ Keys starting with kakaoc cannot be used.

pubsub.PublishSettings

TypeFieldDescription
time.DurationDelayThresholdSends a publish request to the server at the set time interval
intCountThresholdCollects and sends messages in the set number
intNumGoroutinesLimits the number of go routines running simultaneously
time.DurationTimeoutSets the timeout time for requests to the server
FlowControlSettingsFlowControlSettingsControls the reception speed through flow control

pubsub.FlowControlSettings

TypeFieldDescription
intMaxOutstandingMessagesSpecifies the maximum number of messages that have not yet been published
intMaxOutstandingBytesSpecifies the maximum size of messages that have not yet been published
LimitExceededBehaviorLimitExceededBehaviorDefines the behavior when the limit is exceeded
- ignore
- block
- signal error(default)

pubsub.PublishResult

TypeFunctionDescription
stringGet(ctx context.Context)Gets the message id information that was actually published to the server
- Blocks until it is published to the server
Example
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg))
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}

Receive message asynchronously

Receives a message from the subscription asynchronously.

Method
TypeMethodDescription
SubscriptionReceives(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message) errorReceives a message from a subscription by streaming, and executes a callback function
Parameters
TypeFieldDescription
func(ctx2 context.Context, message *Message)callbackFuncCallbacks function to execute after receiving a message

pubsub.ReceiveSettings

TypeFieldDescription
intNumGoroutinesLimits the number of goroutines that can be executed simultaneously
- Default: 2
- Max: 10
FlowControlSettingsFlowControlSettingsControls publishing speed with flow control

pubsub.FlowControlSettings

TypeFieldDescription
intMaxOutstandingMessagesSpecifies the maximum number of messages that can be processed simultaneously
- Default: 1000
intMaxOutstandingBytesSpecifies the maximum size of messages that can be processed simultaneously
- Default: 1G
LimitExceededBehaviorLimitExceededBehaviorDefines the behavior when the limit is exceeded
- ignore
- block(default)
- signal error

pubsub.Message

TypeFunctionDescription
stringIDUnique ID for the message
stringDataMessage body
map[string]stringAttributesMessage Attributes
time.TimePublishTimeThe time the message was published
*intDeliveryAttemptThe number of times the message was received
Example
func (s *Subscription) Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}

Receive message synchronously

Receives messages from a subscription synchronously.

Method
TypeMethodDescription
SubscriptionPull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)Consumes messages from a subscription synchronously
- Can receive messages in batches
Parameters
TypeFieldDescription
intmaxMessagesBatch size
- Number of messages to receive at a time

pubsub.PulledMessage

Same as pubsub.Message, but read-only.

TypeFunctionDescription
stringIDUnique ID of the message
stringDataMessage body
map[string]stringAttributesMessage attributes
time.TimePublishTimeTime the message was published
*intDeliveryAttemptNumber of messages received
Example
func (s *Subscription) Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}

Acknowledge message reception

Acknowledge the retrieved message.

Method
TypeMethod설명
SubscriptionAcknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)Notifies the subscription that a message has been received using the AckID of the message
Parameters
TypeFieldDescription
[]stringackIDsAckID of the message

pubsub.AcknowledgeResponse

TypeFieldDescription
[]*AcknowledgeFailureResultFailureInformation about the request failure

pubsub.AcknowledgeFailureResult

TypeFieldDescription
stringAckIDAckID of the request failure
errorErrorReason for the request failure
Example
func (s *Subscription) Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

resp, err := subscription.Acknowledge(ctx, ackIDs)
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
return nil
}

Modify message ACK deadline

Modify message ACK deadline.

  • Extend: You can extend the message's usage time from the current time by the subscription's AckDeadline time.
  • Skip: Set the message's usage time to 0, allowing other clients to receive the message.
Method
TypeMethodDescription
SubscriptionModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction)
(*ModifyAckDeadlineResponse, error)
Increase or decrease the lifetime of a message using the AckID of the message
Parameters
TypeFieldDescription
[]stringackIDsThe AckID of the message
AckActionactionWhether to extend the AckDeadline of the message
pubsub.ActionExtend, pubsub.ActionSkip

pubsub.ModifyAckDeadlineResponse

TypeFieldDescription
[]*ModifyAckDeadlineSuccessResultSuccessRequest success information
[]*ModifyAckDeadlineFailureResultFailureRequest failure information

pubsub.ModifyAckDeadlineSuccessResult

TypeFieldDescription
stringAckIDExisting AckID
stringReissuedAckIDNewly issued AckID while extending

pubsub.ModifyAckDeadlineFailureResult

TypeFieldDescription
stringAckIDAckID that failed the request
errorErrorReason for request failure
Example
func (s *Subscription) ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

// Skip 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}