Method list
Preparation and requirements
For preparation and requirements to use the Pub/Sub Go SDK, refer to the SDK Overview document.
Topic
Create topic
You can create a new topic using the following items.
Method
Type | Method | Description |
---|---|---|
Client | CreateTopic(ctx context.Context, name string) (*Topic, error) | The message retention period is set to the default value (7 days). |
Client | CreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error) |
Parameters
Type | Method | Description |
---|---|---|
TopicConfig | cfg | An object containing information about the topic to be created. |
pubsub.TopicConfig
Type | Field | Description |
---|---|---|
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days). |
string | Description | Topic description. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicCreate(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, name)
if err != nil {
return fmt.Errorf("CreateTopic: %v", err)
}
fmt.Printf("Topic created: %v\n", topic)
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
config := pubsub.TopicConfig{
RetentionDuration: duration,
Description: "topic description",
}
topic, err := client.CreateTopicWithConfig(ctx, name, &config)
if err != nil {
return fmt.Errorf("CreateTopicWithConfig: %v", err)
}
fmt.Printf("Topic created: %v\n", topic)
return nil
}
Retrieve topic
You can retrieve a topic using the following items.
Method
Type | Method | Description |
---|---|---|
Client | Topic(name string) *Topic | Returns a topic object regardless of its actual existence, allowing you to configure settings such as publish settings. |
Topic | Exists(ctx context.Context) (bool, error) | Checks the server to determine if the topic actually exists. |
Topic | Config(ctx context.Context) (*TopicConfig, error) | Retrieves information about the topic. |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)
tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)
return nil
}
Retrieve topic list
Retrieves the list of topics within the project.
Method
Type | Method |
---|---|
Client | Topics(ctx context.Context) *TopicIterator |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}
토픽의 서브스크립션 목록 조회
특정 토픽의 서브스크립션 목록을 조회합니다.
Method
Type | Method |
---|---|
Topic | Subscriptions(ctx context.Context) *SubscriptionIterator |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}
Modify topic
Allows modification of the retention period and description of a specific topic.
Method
Type | Method |
---|---|
Topic | Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error) |
Parameters
Type | Field | Description |
---|---|---|
TopicConfigToUpdate | cfg | An object containing information about the topic to be modified. |
pubsub.TopicConfigToUpdate
Type | Field | Description |
---|---|---|
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days). |
string | Description | Topic descriptio |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
cfg := &pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}
Delete topic
You can delete a topic using the following items.
When a topic is deleted, its associated subscriptions are also deleted.
Method
Type | Method |
---|---|
Topic | Delete(ctx context.Context) error |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicDelete(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
err = topic.Delete(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
return nil
}
Subscription
Create subscription
You can create a subscription using the following items.
Method
Type | Method | Description |
---|---|---|
Client | CreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResult | The message retention period is set to the default value (7 days). |
Parameters
Type | Field | Description |
---|---|---|
SubscriptionConfig | cfg | An object containing information about the subscription to be created. |
pubsub.SubscriptionConfig
Type | Field | Description |
---|---|---|
string | Topic | Topic name. |
time.Duration | AckDeadline | Message acknowledgment waiting time. |
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days). |
PushConfig | PushConfig | Settings for PushSubscription. |
ObjectStorageConfig | ObjectStorageConfig | Settings for ObjectStorageSubscription. |
int | MaxDeliveryAttempt | Number of retry attempts - Default: 1 (infinite). |
pubsub.PushConfig
Type | Field | Description |
---|---|---|
string | Endpoint | Destination endpoint for sending messages. |
int | BatchSize | Number of messages to send at once - Default: 1 . |
pubsub.ObjectStorageConfig
Type | Field | Description |
---|---|---|
string | Bucket | Object Storage bucket name. |
int | ExportIntervalMinutes | Frequency for storing to Object Storage - Min: 1 min - Max: 10 min - Default: 5 min.⚠️ At least one file is created during the interval, but files may be created before the interval ends. |
string | FilePrefix | Prefix for files stored in Object Storage. |
string | FileSuffix | Suffix for files stored in Object Storage. |
pubsub.SubscriptionResult
Type | Field | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Retrieves the state of the completed subscription request and blocks until the request is complete. ⚠️ Activated when the option.WithWaitStatus() option is set. |
SubscriptionResult | Name() string | Name of the requested subscription. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
pullSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
}
pushSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:9999/push",
BatchSize: 1,
},
}
objectStorageSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
RetentionDuration: 24 * time.Hour,
ObjectStorageConfig: &pubsub.ObjectStorageConfig{
Bucket: "bucket",
ExportIntervalMinutes: 2,
FilePrefix: "prefix",
FileSuffix: "suffix",
},
}
result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())
success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success :%s\n", result.Name(), status)
result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())
success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())
success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
return nil
}
Retrieve subscription
Checks the existence of a subscription.
Method
Type | Method | Description |
---|---|---|
Client | Subscription(name string) *Subscription | Returns a subscription object regardless of its actual existence. - Allows configuration of settings such as ReceiveSettings . |
Subscription | Exists(ctx context.Context) (bool, error) | Queries the server to check if the subscription actually exists. |
Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | Retrieves information about the subscription. |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)
sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}
Retrieve subscription list
Retrieves the list of subscriptions belonging to the domain and project configured in the client.
Method
Type | Method | Description |
---|---|---|
Client | Subscriptions(ctx context.Context) *SubscriptionIterator | Returns an iterator that retrieves actual subscription information. |
Parameters
There are no parameters.
Example
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}
Modify subscription
Modifies a subscription. At least one item must be included for modification.
Name | Description |
---|---|
AckDeadline | Must differ from the existing value. - Min: 10 sec - Max: 600 sec |
MessageRetentionDuration | Must differ from the existing value. - Min: 600 sec - Max: 604800 sec (7 days) |
MaxDeliveryAttempt | Number of retry attempts. - Min: 1 - Max: Infinite (values between 1–100 can be specified). - Default: -1 |
PushConfig.Endpoint | Required only for PushSubscriptions and must differ from the existing value. |
PushConfig.BatchSize | Required only for PushSubscriptions and must differ from the existing value. - Default: 1 (unlimited). |
ObjectStorageConfig.Bucket | Required only for ObjectStorageSubscriptions. |
ObjectStorageConfig.ExportIntervalMinutes | - Min: 1 min- Max: 10 min- Default: 5 min |
ObjectStorageConfig.FilePrefix | For ObjectStorageSubscriptions, must differ from the existing value. |
ObjectStorageConfig.FileSuffix | For ObjectStorageSubscriptions, must differ from the existing value. |
Method
Type | Method | Description |
---|---|---|
Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult | Modifies the subscription based on the SubscriptionConfigToUpdate values. |
Parameters
Type | Field | Description |
---|---|---|
SubscriptionConfigToUpdate | cfg | Configuration values required for modifying a subscription. |
pubsub.SubscriptionConfigToUpdate
Type | Field | Description |
---|---|---|
time.Duration | AckDeadline | Message acknowledgment waiting time. |
time.Duration | RetentionDuration | Message retention period - Default: 604,800s (7 days). |
int | MaxDeliveryAttempt | Number of retry attempts - Min: 1 - Max: Infinite (values between 1–100 can be specified). - Default: -1 . |
PushConfig | PushConfig | Settings for PushSubscription. |
ObjectStorageConfig | ObjectStorageConfig | Settings for ObjectStorageSubscription. |
pubsub.PushConfig
Type | Field | Description |
---|---|---|
string | Endpoint | Endpoint for sending messages. |
int | BatchSize | Number of messages to send at once - Default: 1 . |
pubsub.ObjectStorageConfig
Type | Field | Description |
---|---|---|
string | Bucket | Object Storage bucket name. |
int | ExportIntervalMinutes | Frequency for storing to Object Storage - Default: 5 . |
string | FilePrefix | Prefix for files stored in Object Storage. |
string | FileSuffix | Suffix for files stored in Object Storage. |
pubsub.SubscriptionResult
Type | Field | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Retrieves the state of the completed subscription request and blocks until the request is complete. ⚠️ Activated when the option.WithWaitStatus() option is set. |
SubscriptionResult | Name() string | Name of the requested subscription. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
updateConfig := &pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
result := subscription.Update(context.Background(), updateConfig, option.WithWaitStatus())
success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)
return nil
}
Delete subscription
You can delete a subscription using the following items.
Method
Type | Method | Description |
---|---|---|
Subscription | Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult | Deletes the subscription. |
Parameters
pubsub.SubscriptionResult
Type | Field | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Retrieves the state of the completed subscription request and blocks until the request is complete. ⚠️ Activated when the option.WithWaitStatus() option is set. |
SubscriptionResult | Name() string | Name of the requested subscription. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionDelete(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
result := subscription.Delete(ctx, option.WithWaitStatus())
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
fmt.Printf("Subscription deleted %s\n", result.Name())
return nil
}
Rewind subscription time
Rewinds the subscription to a specified point in time, allowing messages received after that time to be processed again.
Method
Type | Method | Description |
---|---|---|
Subscription | SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult | Rewinds the subscription time. |
Parameters
Type | Field | Description |
---|---|---|
time.Time | t | The point in time to which the subscription should be rewound. |
pubsub.SubscriptionResult
Type | Field | Description |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | Retrieves the state of the completed subscription request and blocks until the request is complete. ⚠️ Activated when the option.WithWaitStatus() option is set. |
SubscriptionResult | Name() string | Name of the requested subscription. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}
Message
Publish message
Allows publishing a message to a topic.
Method
Type | Method |
---|---|
PublishResult | Publish(ctx context.Context, msg *Message) |
Parameters
Type | Field | Description |
---|---|---|
Message | msg | The message to be published. |
pubsub.Message
Type | Field | Description |
---|---|---|
string | Data | Message body. ⚠️ Must be Base64-encoded. |
map[string]string | Attributes | Message attributes in a [key, value] map format. ⚠️ Keys starting with kakaoc are not allowed. |
pubsub.PublishSettings
Type | Field | Description |
---|---|---|
time.Duration | DelayThreshold | Sends publish requests to the server at the specified interval. - Default: 100ms . |
int | CountThreshold | Collects and sends messages in batches of the specified size. - Default: 100 . |
int | NumGoroutines | Limits the number of concurrently running Go routines. - Default: 2 .- Max: 10 . |
time.Duration | Timeout | Sets the timeout for server requests. - Default: 60s . |
FlowControlSettings | FlowControlSettings | Controls the publishing rate through flow control. |
pubsub.FlowControlSettings
Type | Field | Description |
---|---|---|
int | MaxOutstandingMessages | Specifies the maximum number of messages pending publication. - Default: 1000 . |
int | MaxOutstandingBytes | Specifies the maximum size of messages pending publication. - Default: 1G . |
LimitExceededBehavior | LimitExceededBehavior | Defines behavior when the limit is exceeded. - ignore : Disables flow control.- block (default): Waits until requests can be made without exceeding the limit (useful for batching processing where latency of individual requests is not critical).- signal error : Returns an error. |
pubsub.PublishResult
Type | Function | Description |
---|---|---|
string | Get(ctx context.Context) | Retrieves the message id of the message published to the server.- Blocks until the message is published. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}
Receive message asynchronously
Receives messages from a subscription asynchronously.
This can only be used for pull-type subscriptions.
Method
Type | Method | Description |
---|---|---|
Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | Streams messages from the subscription and executes the callback function. |
Parameters
Type | Field | Description |
---|---|---|
func(ctx2 context.Context, message *Message) | callbackFunc | Callback function to execute after receiving a message. |
pubsub.ReceiveSettings
Type | Field | Description |
---|---|---|
int | NumGoroutines | Limits the number of concurrently running Goroutines. - Default: 2 .- Max: 10 . |
FlowControlSettings | FlowControlSettings | Controls the publishing rate through flow control. |
pubsub.FlowControlSettings
Type | Field | Description |
---|---|---|
int | MaxOutstandingMessages | Specifies the maximum number of messages pending publication. - Default: 1000 . |
int | MaxOutstandingBytes | Specifies the maximum size of messages pending publication. - Default: 1G . |
LimitExceededBehavior | LimitExceededBehavior | Defines behavior when the limit is exceeded. - ignore : Disables flow control.- block (default): Waits until requests can be made without exceeding the limit (useful for batching processing where latency of individual requests is not critical).- signal error : Returns an error. |
pubsub.Message
Type | Function | Description |
---|---|---|
string | ID | Unique ID of the message. |
string | Data | Message body. |
map[string]string | Attributes | Message attributes. |
time.Time | PublishTime | Time when the message was published. |
*int | DeliveryAttempt | Number of times the message has been received. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}
Receive message synchronously
Receives messages from a subscription synchronously.
Method
Type | Method | Description |
---|---|---|
Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | Consumes messages from the subscription synchronously. - Supports batch message reception. |
Parameters
Type | Field | Description |
---|---|---|
int | maxMessages | Batch size. - Number of messages to receive at once. |
pubsub.PulledMessage
Identical to pubsub.Message
, but read-only.
Type | Function | Description |
---|---|---|
string | ID | Unique ID of the message. |
string | Data | Message body. |
map[string]string | Attributes | Message attributes. |
time.Time | PublishTime | Time when the message was published. |
*int | DeliveryAttempt | Number of times the message has been received. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}
Acknowledge message reception
Marks retrieved messages as acknowledged.
Method
Type | Method | Description |
---|---|---|
Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | Notifies the subscription that messages have been received using their AckIDs. |
Parameters
Type | Field | Description |
---|---|---|
[]string | ackIDs | AckIDs of the messages. |
pubsub.AcknowledgeResponse
Type | Field | Description |
---|---|---|
[]*AcknowledgeFailureResult | Failure | Information about failed requests. |
pubsub.AcknowledgeFailureResult
Type | Field | Description |
---|---|---|
string | AckID | The AckID of the failed request. |
error | Error | Reason for the failure. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
resp, err := subscription.Acknowledge(ctx, ackIDs)
if err != nil {
// handle error
}
if resp != nil {
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
}
return nil
}
Modify message acknowledgment deadline
Modifies the acknowledgment deadline (Ack Deadline) for a message.
Extend
: Extends the message's usage time from the current point by the subscription's AckDeadline duration.Skip
: Sets the message's usage time to 0, allowing other clients to receive the message.
Method
Type | Method | Description |
---|---|---|
Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | Extends or reduces the usage period of a message using its AckID. |
Parameters
Type | Field | Description |
---|---|---|
[]string | ackIDs | AckIDs of the messages. |
AckAction | action | Indicates whether to extend the AckDeadline.pubsub.ActionExtend , pubsub.ActionSkip . |
pubsub.ModifyAckDeadlineResponse
Type | Field | Description |
---|---|---|
[]*ModifyAckDeadlineSuccessResult | Success | Information about successful requests. |
[]*ModifyAckDeadlineFailureResult | Failure | Information about failed requests. |
pubsub.ModifyAckDeadlineSuccessResult
Type | Field | Description |
---|---|---|
string | AckID | The original AckID. |
string | ReissuedAckID | A newly issued AckID after extension. |
pubsub.ModifyAckDeadlineFailureResult
Type | Field | Description |
---|---|---|
string | AckID | The AckID of the failed request. |
error | Error | Reason for the failure. |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
// Skip의 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend의 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}