Method list
Topics and subscriptions can only be created/deleted in the KakaoCloud Console.
Preparation and requirements
For pre-work and requirements for using the Pub/Sub Go SDK, please refer to the Overview.
Topic
Retrieve topic
You can retrieve topics using the items below.
Method
Type | Method | Description |
---|---|---|
Topic | Topic(name string) | Returns a topic object regardless of its actual existence, and can be configured with publish settings, etc. |
bool | Exists(ctx context.Context) | Checks on the server to see if the actual topic exists. |
TopicConfig | Config(ctx context.Context) | Checks topic information |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)
tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)
return nil
}
Retrieve topic list
Retrieve the topic list within the project.
Method
Type | Method |
---|---|
TopicIterator | Topics(ctx context.Context) |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}
Retrieve subscription list of topic
Retrieve the subscription list for a specific topic.
Method
Type | Method |
---|---|
SubscriptionIterator | Subscriptions(ctx context.Context) |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}
Modify topic
You can modify the retention period and description of a specific topic.
Method
Type | Method |
---|---|
TopicConfig | Update(ctx context.Context, cfg TopicConfigToUpdate) |
Parameters
Type | Field | Description |
---|---|---|
TopicConfigToUpdate | cfg | An object containing information about the topic to be modified |
pubsub.TopicConfigToUpdate
Type | Field | Description |
---|---|---|
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days) |
string | Description | Topic description |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
cfg := pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}
Subscription
Retrieve subscription
You can check whether a subscription exists.
Method
Type | Method | Description |
---|---|---|
Client | Subscription(name string) *Subscription | Returns a Subscription object regardless of its actual existence - You can configure ReceiveSettings, etc. |
Subscription | Exists(ctx context.Context) (bool, error) | Checks the server for the existence of an actual subscription |
Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | Check subscription information |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)
sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}
Retrieve subscription list
Retrieve the subscription list belonging to a specific domain and project set on the client.
Method
Type | Method | Description |
---|---|---|
Client | Subscriptions(ctx context.Context) *SubscriptionIterator | Returns an iterator that can retrieve actual subscription information |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}
Modify subscription
Modify a specific subscription. When modifying, you must include at least one item.
Name | Description |
---|---|
AckDeadline | Must be different from the existing settings - Min: 10 sec - Max: 600 sec |
MessageRetentionDuration | Must be different from the existing settings - Min: 600 sec - Max: 604800 sec (7 days) |
MaxDeliveryAttempt | Number of reprocessing attempts - Min: 1 - Max: infinite( -1 ) |
PushConfig.Endpoint | For 'Push' subscription, it must be different from the existing settings |
PushConfig.BatchSize | For 'Push' subscription, it must be different from the existing settings |
Method
Type | Method | Description |
---|---|---|
Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error) | Modifies subscription according to SubscriptionConfigToUpdate value |
Parameters
Type | Field | Description |
---|---|---|
SubscriptionConfigToUpdate | cfg | Setting values required to modify subscription |
pubsub.SubscriptionConfigToUpdate
Type | Field | Description |
---|---|---|
time.Duration | AckDeadline | Message response waiting time |
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days) |
int | MaxDeliveryAttempt | Number of reprocessing attempts - Min: 1 - Max: Infinite - Default: -1 (Infinite) |
*PushConfig | PushConfig | Values configured for Push subscription |
pubsub.PushConfig
Type | Field | Description |
---|---|---|
string | Endpoint | Endpoint of the destination to send the message to |
int | BatchSize | Number of messages to send at a time - Default: 1 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
updateConfig := pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost/push:8888",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
updatedPushSubConfig, err := subscription.Update(context.Background(), &updateConfig)
fmt.Printf("Subscription updated config : %+v\n", updatedPushSubConfig)
return nil
}
Rewind subscription time
You can rewind the subscription time and receive messages again after the specified time.
Method
Type | Method | Description |
---|---|---|
Subscription | SeekToTime(ctx context.Context, t time.Time) error | Rewind subscription time |
Parameters
Type | Field | Description |
---|---|---|
time.Time | t | Time to rewind the subscription time |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
err = subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour))
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}
Message
Publish message
You can publish a message to a topic.
Method
Type | Method |
---|---|
PublishResult | Publish(ctx context.Context, msg *Message) |
Parameters
Type | Field | Description |
---|---|---|
Message | msg | Message to publish |
pubsub.Message
Type | Field | Description |
---|---|---|
string | Data | Message body ⚠️ Must be encoded in Base64. |
map[string]string | Attributes | Message attributes map in the format of [key, value] ⚠️ Keys starting with kakaoc cannot be used. |
pubsub.PublishSettings
Type | Field | Description |
---|---|---|
time.Duration | DelayThreshold | Sends a publish request to the server at the set time interval |
int | CountThreshold | Collects and sends messages in the set number |
int | NumGoroutines | Limits the number of go routines running simultaneously |
time.Duration | Timeout | Sets the timeout time for requests to the server |
FlowControlSettings | FlowControlSettings | Controls the reception speed through flow control |
pubsub.FlowControlSettings
Type | Field | Description |
---|---|---|
int | MaxOutstandingMessages | Specifies the maximum number of messages that have not yet been published |
int | MaxOutstandingBytes | Specifies the maximum size of messages that have not yet been published |
LimitExceededBehavior | LimitExceededBehavior | Defines the behavior when the limit is exceeded - ignore - block - signal error (default) |
pubsub.PublishResult
Type | Function | Description |
---|---|---|
string | Get(ctx context.Context) | Gets the message id information that was actually published to the server - Blocks until it is published to the server |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg))
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}
Receive message asynchronously
Receives a message from the subscription asynchronously.
Method
Type | Method | Description |
---|---|---|
Subscription | Receives(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message) error | Receives a message from a subscription by streaming, and executes a callback function |
Parameters
Type | Field | Description |
---|---|---|
func(ctx2 context.Context, message *Message) | callbackFunc | Callbacks function to execute after receiving a message |
pubsub.ReceiveSettings
Type | Field | Description |
---|---|---|
int | NumGoroutines | Limits the number of goroutines that can be executed simultaneously - Default: 2 - Max: 10 |
FlowControlSettings | FlowControlSettings | Controls publishing speed with flow control |
pubsub.FlowControlSettings
Type | Field | Description |
---|---|---|
int | MaxOutstandingMessages | Specifies the maximum number of messages that can be processed simultaneously - Default: 1000 |
int | MaxOutstandingBytes | Specifies the maximum size of messages that can be processed simultaneously - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | Defines the behavior when the limit is exceeded - ignore - block (default)- signal error |
pubsub.Message
Type | Function | Description |
---|---|---|
string | ID | Unique ID for the message |
string | Data | Message body |
map[string]string | Attributes | Message Attributes |
time.Time | PublishTime | The time the message was published |
*int | DeliveryAttempt | The number of times the message was received |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}
Receive message synchronously
Receives messages from a subscription synchronously.
Method
Type | Method | Description |
---|---|---|
Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | Consumes messages from a subscription synchronously - Can receive messages in batches |
Parameters
Type | Field | Description |
---|---|---|
int | maxMessages | Batch size - Number of messages to receive at a time |
pubsub.PulledMessage
Same as pubsub.Message, but read-only.
Type | Function | Description |
---|---|---|
string | ID | Unique ID of the message |
string | Data | Message body |
map[string]string | Attributes | Message attributes |
time.Time | PublishTime | Time the message was published |
*int | DeliveryAttempt | Number of messages received |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}
Acknowledge message reception
Acknowledge the retrieved message.
Method
Type | Method | 설명 |
---|---|---|
Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | Notifies the subscription that a message has been received using the AckID of the message |
Parameters
Type | Field | Description |
---|---|---|
[]string | ackIDs | AckID of the message |
pubsub.AcknowledgeResponse
Type | Field | Description |
---|---|---|
[]*AcknowledgeFailureResult | Failure | Information about the request failure |
pubsub.AcknowledgeFailureResult
Type | Field | Description |
---|---|---|
string | AckID | AckID of the request failure |
error | Error | Reason for the request failure |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
resp, err := subscription.Acknowledge(ctx, ackIDs)
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
return nil
}
Modify message ACK deadline
Modify message ACK deadline.
Extend
: You can extend the message's usage time from the current time by the subscription's AckDeadline time.Skip
: Set the message's usage time to 0, allowing other clients to receive the message.
Method
Type | Method | Description |
---|---|---|
Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | Increase or decrease the lifetime of a message using the AckID of the message |
Parameters
Type | Field | Description |
---|---|---|
[]string | ackIDs | The AckID of the message |
AckAction | action | Whether to extend the AckDeadline of the messagepubsub.ActionExtend, pubsub.ActionSkip |
pubsub.ModifyAckDeadlineResponse
Type | Field | Description |
---|---|---|
[]*ModifyAckDeadlineSuccessResult | Success | Request success information |
[]*ModifyAckDeadlineFailureResult | Failure | Request failure information |
pubsub.ModifyAckDeadlineSuccessResult
Type | Field | Description |
---|---|---|
string | AckID | Existing AckID |
string | ReissuedAckID | Newly issued AckID while extending |
pubsub.ModifyAckDeadlineFailureResult
Type | Field | Description |
---|---|---|
string | AckID | AckID that failed the request |
error | Error | Reason for request failure |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
// Skip 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}