Method list
Preparation and requirements
For prerequisites and requirements to use the Pub/Sub Go SDK, refer to the SDK overview documentation.
Topic
Retrieve topic
You can retrieve a topic using the methods below.
Method
Type | Method | Description |
---|---|---|
Client | Topic(name string) *Topic | Returns a topic object regardless of existence; allows configuration such as Publish Setting |
Topic | Exists(ctx context.Context) (bool, error) | Checks from the server whether the topic actually exists |
Topic | Config(ctx context.Context) (*TopicConfig, error) | Retrieves information about the topic |
Parameters
No parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)
tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)
return nil
}
Retrieve topic list
Retrieves the list of topics in a project.
Method
Type | Method |
---|---|
Client | Topics(ctx context.Context) *TopicIterator |
Parameters
No parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}
Retrieve subscription list of topic
Retrieves the list of subscriptions for a specific topic.
Method
Type | Method |
---|---|
Topic | Subscriptions(ctx context.Context) *SubscriptionIterator |
Parameters
No parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}
Modify topic
You can update the retention period and description of a specific topic.
Method
Type | Method |
---|---|
Topic | Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error) |
Parameters
Type | Field | Description |
---|---|---|
TopicConfigToUpdate | cfg | Object containing information to update the topic |
pubsub.TopicConfigToUpdate
Type | Field | Description |
---|---|---|
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days) |
string | Description | Topic description |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
cfg := &pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}
Create topic
You can create a new topic using the methods below.
Method
Type | Method | Description |
---|---|---|
Client | CreateTopic(ctx context.Context, name string) (*Topic, error) | Retention period is set to default (7 days) |
Client | CreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error) | Allows specifying topic configuration |
Parameters
Type | Field | Description |
---|---|---|
TopicConfig | cfg | Object containing information about the topic to be created |
pubsub.TopicConfig
Type | Field | Description |
---|---|---|
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days) |
string | Description | Topic description |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicCreate(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, name)
if err != nil {
return fmt.Errorf("CreateTopic: %v", err)
}
fmt.Printf("Topic created: %v\n", topic)
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
config := pubsub.TopicConfig{
RetentionDuration: duration,
Description: "topic description",
}
topic, err := client.CreateTopicWithConfig(ctx, name, &config)
if err != nil {
return fmt.Errorf("CreateTopicWithConfig: %v", err)
}
fmt.Printf("Topic created: %v\n", topic)
return nil
}
Delete topic
You can delete a topic using the method below.
When a topic is deleted, all associated subscriptions are also deleted.
Method
Type | Method |
---|---|
Topic | Delete(ctx context.Context) error |
Parameters
No parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicDelete(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
err = topic.Delete(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
return nil
}
Subscription
Retrieve subscription
Check whether a subscription exists.
Method
Type | Method | Description |
---|---|---|
Client | Subscription(name string) *Subscription | Returns a subscription object regardless of existence - Enables configuration such as ReceiveSettings |
Subscription | Exists(ctx context.Context) (bool, error) | Checks from the server whether the subscription actually exists |
Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | Retrieves information about the subscription |
Parameters
No parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)
sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}
Retrieve subscription list
Retrieves the list of subscriptions belonging to the domain and project set in the client.
Method
Type | Method | Description |
---|---|---|
Client | Subscriptions(ctx context.Context) *SubscriptionIterator | Returns an iterator for retrieving actual subscription info |
Parameters
No parameters.
Example
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Subscriptions(ctx)
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}
Modify subscription
Updates a subscription. At least one field must be provided for modification.
Method
Type | Method | Description |
---|---|---|
Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult | Updates the subscription based on SubscriptionConfigToUpdate values |
Parameters
Type | Field | Description |
---|---|---|
SubscriptionConfigToUpdate | cfg | Configuration values for the update |
pubsub.SubscriptionConfigToUpdate
Type | Field | Description |
---|---|---|
time.Duration | AckDeadline | Acknowledgment deadline |
time.Duration | RetentionDuration | Message retention duration - Default: 604,800s (7 days) |
int | MaxDeliveryAttempt | Max delivery attempts - Min: 1 , Max: infinite (1~100) - Default: -1 |
PushConfig | PushConfig | Configuration for Push subscription |
ObjectStorageConfig | ObjectStorageConfig | Configuration for Object Storage subscription |
pubsub.PushConfig
Type | Field | Description |
---|---|---|
string | Endpoint | Target endpoint to deliver messages |
int | BatchSize | Number of messages sent per batch - Default: 1 |
pubsub.ObjectStorageConfig
Type | Field | Description |
---|---|---|
string | Bucket | Name of Object Storage bucket |
int | ExportIntervalMinutes | Export interval to Object Storage - Default: 5 |
string | FilePrefix | Prefix for exported file |
string | FileSuffix | Suffix for exported file |
pubsub.SubscriptionResult
Method | Description |
---|---|
Get(ctx context.Context) (string, error) | Retrieves the final status of the update request (blocks until complete). ⚠️ Enabled only with option.WithWaitStatus() |
Name() string | Returns the name of the requested subscription |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
"time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
updateConfig := &pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
result := subscription.Update(ctx, updateConfig, option.WithWaitStatus())
status, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)
return nil
}
Rewind subscription time
Rewind a subscription to a specified point in time and retrieve all messages published after that time.
Method
Type | Method | Description |
---|---|---|
Subscription | SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult | Rewinds the subscription to the specified time |
Parameters
Type | Field | Description |
---|---|---|
time.Time | t | Time to which the subscription should be rewound |
pubsub.SubscriptionResult
Type | Method | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Returns the final state of the subscription request (blocking until complete). ⚠️ Requires option.WithWaitStatus() |
SubscriptionResult | Name() string | Returns the name of the requested subscription |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
"time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}
Create subscription
You can create a subscription using the following method.
Method
Type | Method | Description |
---|---|---|
Client | CreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResult | Creates a subscription (default message retention: 7 days) |
Parameters
Type | Field | Description |
---|---|---|
SubscriptionConfig | cfg | Object containing subscription settings |
pubsub.SubscriptionConfig
Type | Field | Description |
---|---|---|
string | Topic | Topic name |
time.Duration | AckDeadline | Acknowledgment deadline |
time.Duration | RetentionDuration | Message retention duration - Default: 604,800s (7 days) |
PushConfig | PushConfig | Settings for push subscription |
ObjectStorageConfig | ObjectStorageConfig | Settings for object storage subscription |
int | MaxDeliveryAttempt | Max delivery attempts - Default: 1 (unlimited) |
pubsub.PushConfig
Type | Field | Description |
---|---|---|
string | Endpoint | Destination endpoint for messages |
int | BatchSize | Number of messages per batch - Default: 1 |
pubsub.ObjectStorageConfig
Type | Field | Description |
---|---|---|
string | Bucket | Object Storage bucket name |
int | ExportIntervalMinutes | Export interval (min: 1 , max: 10 , default: 5 ) ⚠️ At least one file is created per interval. Files may also be created before interval ends |
string | FilePrefix | File prefix |
string | FileSuffix | File suffix |
pubsub.SubscriptionResult
Type | Method | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Returns the final state of the subscription request (blocking) ⚠️ Requires option.WithWaitStatus() |
SubscriptionResult | Name() string | Name of the created subscription |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
"time"
)
func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
pullSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
}
pushSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:9999/push",
BatchSize: 1,
},
}
objectStorageSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
RetentionDuration: 24 * time.Hour,
ObjectStorageConfig: &pubsub.ObjectStorageConfig{
Bucket: "bucket",
ExportIntervalMinutes: 2,
FilePrefix: "prefix",
FileSuffix: "suffix",
},
}
result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())
status, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())
status, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())
status, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
return nil
}
Delete subscription
Delete a subscription using the method below.
Method
Type | Method | Description |
---|---|---|
Subscription | Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult | Deletes subscription |
Parameters
pubsub.SubscriptionResult
Type | Method | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Returns final subscription status (blocking). ⚠️ Requires option.WithWaitStatus() |
SubscriptionResult | Name() string | Name of the subscription request |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go/option"
)
func subscriptionDelete(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
result := subscription.Delete(ctx, option.WithWaitStatus())
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
fmt.Printf("Subscription deleted %s\n", result.Name())
return nil
}
Message
Publish message
You can publish messages to a topic.
Method
Type | Method |
---|---|
PublishResult | Publish(ctx context.Context, msg *Message) |
Parameters
Type | Field | Description |
---|---|---|
Message | msg | Message to be published |
pubsub.Message
Type | Field | Description |
---|---|---|
string | Data | Message body ⚠️ Must be Base64-encoded |
map[string]string | Attributes | Message attributes in key-value map format ⚠️ Keys starting with kakaoc are not allowed |
pubsub.PublishSettings
Type | Field | Description |
---|---|---|
time.Duration | DelayThreshold | Send batched messages to server at this interval - Default: 100ms |
int | CountThreshold | Batch this many messages before sending - Default: 100 |
int | NumGoroutines | Max number of goroutines - Default: 2 , Max: 10 |
time.Duration | Timeout | Timeout for server requests - Default: 60s |
FlowControlSettings | FlowControlSettings | Controls message flow to prevent overload |
pubsub.FlowControlSettings
Type | Field | Description |
---|---|---|
int | MaxOutstandingMessages | Max number of unprocessed messages - Default: 1000 |
int | MaxOutstandingBytes | Max size of unprocessed messages - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | Behavior when limit exceeded: - ignore : No flow control - block (default): Wait - signal error : Return error |
pubsub.PublishResult
Type | Method | Description |
---|---|---|
string | Get(ctx context.Context) | Retrieves message ID from server - Blocks until message is published |
Example
import (
"context"
"encoding/base64"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
Attributes: attributes,
})
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}
Receive message asynchronously
Receive subscription messages asynchronously.
Available only for subscriptions of type Pull
.
Method
Type | Method | Description |
---|---|---|
Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | Receives messages via streaming and executes callback function |
Parameters
Type | Field | Description |
---|---|---|
func(ctx2 context.Context, message *Message) | callbackFunc | Callback function triggered per message |
pubsub.ReceiveSettings
Type | Field | Description |
---|---|---|
int | NumGoroutines | Max number of concurrent goroutines - Default: 2 , Max: 10 |
FlowControlSettings | FlowControlSettings | Controls message flow |
pubsub.FlowControlSettings
Type | Field | Description |
---|---|---|
int | MaxOutstandingMessages | Max number of unacknowledged messages - Default: 1000 |
int | MaxOutstandingBytes | Max size of unacknowledged messages - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | Behavior when limits exceeded - ignore : No flow control - block (default): Wait - signal error : Return error |
pubsub.Message
Type | Function | Description |
---|---|---|
string | ID | Unique message ID |
string | Data | Message content |
map[string]string | Attributes | Message attributes |
time.Time | PublishTime | Publish timestamp |
*int | DeliveryAttempt | Delivery attempt count |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
return subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
return subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) // blocks until acknowledge is complete
if err != nil {
fmt.Printf("Ack error: %v\n", err)
}
fmt.Printf("Ack status: %s\n", status)
})
}
Receive message synchronously
Receive messages from a subscription synchronously.
Method
Type | Method | Description |
---|---|---|
Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | Retrieves messages in batch from subscription synchronously |
Parameters
Type | Field | Description |
---|---|---|
int | maxMessages | Number of messages to pull in a batch |
pubsub.PulledMessage
Read-only version of pubsub.Message
.
Type | Function | Description |
---|---|---|
string | ID | Message ID |
string | Data | Message content |
map[string]string | Attributes | Message attributes |
time.Time | PublishTime | Time when published |
*int | DeliveryAttempt | Number of deliveries |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
if err != nil {
return fmt.Errorf("subscription.Pull: %v", err)
}
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}
Acknowledge message reception
Send acknowledgment for received messages using their AckIDs.
Method
Type | Method | Description |
---|---|---|
Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | Informs the system that the messages have been handled |
Parameters
Type | Field | Description |
---|---|---|
[]string | ackIDs | AckIDs list |
pubsub.AcknowledgeResponse
Type | Field | Description |
---|---|---|
[]*AcknowledgeFailureResult | Failure | Failed acknowledgment |
pubsub.AcknowledgeFailureResult
Type | Field | Description |
---|---|---|
string | AckID | Failed AckID |
error | Error | Reason for failure |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
resp, err := subscription.Acknowledge(ctx, ackIDs)
if err != nil {
return fmt.Errorf("Acknowledge: %v", err)
}
if resp != nil {
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
}
return nil
}
Modify message ack deadline
Change the message acknowledgment deadline.
Extend
: Extends ack deadline by the subscription’s configured AckDeadline.Skip
: Sets ack deadline to zero, making the message immediately available for redelivery.
Method
Type | Method | Description |
---|---|---|
Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | Modifies AckDeadline based on action specified |
Parameters
Type | Field | Description |
---|---|---|
[]string | ackIDs | AckIDs of the messages |
AckAction | action | pubsub.ActionExtend or pubsub.ActionSkip |
pubsub.ModifyAckDeadlineResponse
Type | Field | Description |
---|---|---|
[]*ModifyAckDeadlineSuccessResult | Success | Successful results |
[]*ModifyAckDeadlineFailureResult | Failure | Failed results |
pubsub.ModifyAckDeadlineSuccessResult
Type | Field | Description |
---|---|---|
string | AckID | Original AckID |
string | ReissuedAckID | New AckID after extend |
pubsub.ModifyAckDeadlineFailureResult
Type | Field | Description |
---|---|---|
string | AckID | Failed AckID |
error | Error | Reason for failure |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
// Skip example
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline (skip): %v", err)
}
// Extend example
resp, err = subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline (extend): %v", err)
}
return nil
}