본문으로 건너뛰기

메서드 목록

사전 작업 및 요구 사항

Pub/Sub Go SDK를 사용하기 위한 사전 작업 및 요구 사항은 SDK 개요 문서를 참고하시기 바랍니다.

안내

kr-central-1 과 kr-central-2에서 지원하는 SDK 규격이 다르므르 리전별 규격을 확인해 주세요.
토픽 생성, 토픽 삭제, 서브스크립션 생성, 서브스크립션 삭제 API는 kr-central-2에서만 지원합니다.

토픽

토픽 조회

아래 항목을 이용하여 토픽을 조회할 수 있습니다.

Method
TypeMethod설명
ClientTopic(name string) *Topic실제 존재 여부와 상관없이 토픽 객체를 반환하며, Publish Setting 등의 설정을 할 수 있음
TopicExists(ctx context.Context) (bool, error)실제 토픽이 존재하는지 서버에서 조회
TopicConfig(ctx context.Context) (*TopicConfig, error)토픽의 정보를 조회
Parameters

파라미터는 없습니다.

Example
func (c *Client) Topic(name string) *Topic
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)

tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)

return nil
}

토픽 목록 조회

프로젝트 내의 토픽 목록을 조회합니다.

Method
TypeMethod
ClientTopics(ctx context.Context) *TopicIterator
Parameters

파라미터는 없습니다.

Example
func (c *Client) Topics(ctx context.Context) *TopicIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}

토픽의 서브스크립션 목록 조회

특정 토픽의 서브스크립션 목록을 조회합니다.

Method
TypeMethod
TopicSubscriptions(ctx context.Context) *SubscriptionIterator
Parameters

파라미터는 없습니다.

Example
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}

토픽 수정

특정 토픽의 보존 주기 및 설명을 수정할 수 있습니다.

Method
TypeMethod
TopicUpdate(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error)
Parameters
TypeField설명
TopicConfigToUpdatecfg수정할 토픽에 대한 정보를 담고 있는 객체

pubsub.TopicConfigToUpdate

TypeField설명
time.DurationRetentionDuration메시지 보존 주기
- Default: 604,800s(7일)
stringDescription토픽 설명
Example
func (t *Topic) Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

cfg := &pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}

토픽 생성

아래 항목을 이용하여 새로운 토픽을 생성할 수 있습니다.

Method
TypeMethod설명
ClientCreateTopic(ctx context.Context, name string) (*Topic, error)메시지 보존 주기는 기본값(7일)으로 설정됨
ClientCreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error)
Parameters
TypeMethod설명
TopicConfigcfg생성할 토픽에 대한 정보를 담고 있는 객체

pubsub.TopicConfig

TypeField설명
time.DurationRetentionDuration메시지 보존 주기
- Default: 604,800s(7일)
stringDescription토픽 설명
Example
func (c *Client) CreateTopic(ctx context.Context, name string) (*Topic, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicCreate(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic, err := client.CreateTopic(ctx, name)
if err != nil {
return fmt.Errorf("CreateTopic: %v", err)
}

fmt.Printf("Topic created: %v\n", topic)
return nil
}
func (c *Client) CreateTopicWithConfig(ctx context.Context, name string, tc *TopicConfig) (*Topic, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

config := pubsub.TopicConfig{
RetentionDuration: duration,
Description: "topic description",
}
topic, err := client.CreateTopicWithConfig(ctx, name, &config)
if err != nil {
return fmt.Errorf("CreateTopicWithConfig: %v", err)
}

fmt.Printf("Topic created: %v\n", topic)
return nil
}

토픽 삭제

아래 항목을 이용하여 토픽을 삭제할 수 있습니다.

주의

토픽 삭제 시 하위 서브스크립션이 함께 삭제됩니다.

Method
TypeMethod
TopicDelete(ctx context.Context) error
Parameters

파라미터는 없습니다.

Example
func (t *Topic) Delete(ctx context.Context) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicDelete(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

err = topic.Delete(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
return nil
}

서브스크립션

서브스크립션 조회

서브스크립션 유무를 확인합니다.

Method
TypeMethod설명
ClientSubscription(name string) *Subscription실제 존재 여부와 상관없이 Subscription 객체를 반환
- ReceiveSettings 등의 설정을 할 수 있음
SubscriptionExists(ctx context.Context) (bool, error)실제 서브스크립션이 존재하는지 서버에서 조회
SubscriptionConfig(ctx context.Context) (*SubscriptionConfig, error)서브스크립션의 정보를 조회
Parameters

파라미터는 없습니다.

Example
func (c *Client) Subscription(name string) *Subscription
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)

sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}

서브스크립션 목록 조회

클라이언트에 설정된 도메인, 프로젝트에 속한 서브스크립션 목록을 조회합니다.

Method
TypeMethod설명
ClientSubscriptions(ctx context.Context) *SubscriptionIterator실제 서브스크립션 정보를 가져올 수 있는 iterator를 반환
Parameters

파라미터는 없습니다.

Example
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}

서브스크립션 수정

서브스크립션을 수정합니다. 수정 시, 최소 하나의 항목을 포함해야 합니다.

Name설명
AckDeadline기존 설정값과 달라야 함
- Min: 10 sec
- Max: 600 sec
MessageRetentionDuration기존 설정값과 달라야 함
- Min: 600 sec
- Max: 604800 sec (7일)
MaxDeliveryAttempt재처리 횟수
- Min: 1
- Max: 무한(1~100 범위에서 횟수 지정 가능)
- Default : -1
PushConfig.EndpointPush Subscription인 경우에만 필수이며, 기존 설정값과 달라야 함
PushConfig.BatchSizePush Subscription인 경우에만 필수이며, 기존 설정값과 달라야 함
Default: 1(무한)
ObjectStorageConfig.BucketObject Storage Subscription인 경우에만 필수
ObjectStorageConfig.ExportIntervalMinutes- Min: 1 min
- Max: 10 min
- Default: 5 min
ObjectStorageConfig.FilePrefixObject StorageSubscription인 경우, 기존 설정값과 달라야 함
ObjectStorageConfig.FileSuffixObject StorageSubscription인 경우, 기존 설정값과 달라야 함
Method
TypeMethod설명
SubscriptionUpdate(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResultSubscriptionConfigToUpdate값에 따라 서브스크립션을 수정
Parameters
TypeField설명
SubscriptionConfigToUpdatecfg서브스크립션 수정에 필요한 설정값

pubsub.SubscriptionConfigToUpdate

TypeField설명
time.DurationAckDeadline메시지 응답 대기 시간
time.DurationRetentionDuration메시지 보존 주기
- Default: 604,800s(7일)
intMaxDeliveryAttempt재처리 횟수
- Min: 1
- Max: 무한(1~100 범위에서 횟수 지정 가능)
- Default: -1
PushConfigPushConfigPushSubscription의 설정값
ObjectStorageConfigObjectStorageConfigObjectStorageSubscription의 설정값

pubsub.PushConfig

TypeField설명
stringEndpoint메시지를 전송할 목적지의 endpoint
intBatchSize한 번에 보낼 메시지의 개수
- Default: 1

pubsub.ObjectStorageConfig

TypeField설명
stringBucketObject Storage 버킷 이름
intExportIntervalMinutesObject Storage로 적재하는 주기
- Default: 5
stringFilePrefixObject Storage로 적재하는 file의 prefix
stringFileSuffixObject Storage로 적재하는 file의 suffix

pubsub.SubscriptionResult

TypeField설명
SubscriptionResultGet(ctx context.Context) (string, error)요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking
⚠️ option.WithWaitStatus() 옵션 설정 시 활성화
SubscriptionResultName() string요청한 서브스크립션 이름
Example
func (s *Subscription) Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
updateConfig := &pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
result := subscription.Update(context.Background(), updateConfig, option.WithWaitStatus())

success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)

return nil
}

서브스크립션 시점 되돌리기

서브스크립션 시점을 지정한 시간으로 되돌리고, 지정 시간 이후에 메시지를 다시 받아볼 수 있습니다.

Method
TypeMethod설명
SubscriptionSeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult서브스크립션 시점 되돌리기
Parameters
TypeField설명
time.Timet서브스크립션 시점을 되돌릴 시간

pubsub.SubscriptionResult

TypeField설명
SubscriptionResultGet(ctx context.Context) (string, error)요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking
⚠️ option.WithWaitStatus() 옵션 설정 시 활성화
SubscriptionResultName() string요청한 서브스크립션 이름
Example
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}

서브스크립션 생성

아래 항목을 이용하여 서브스크립션을 생성할 수 있습니다.

Method
TypeMethod설명
ClientCreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResult메시지 보존 주기는 기본값(7일)으로 설정됨
Parameters
TypeField설명
SubscriptionConfigcfg생성할 Subscription에 대한 정보를 담고 있는 객체

pubsub.SubscriptionConfig

TypeField설명
stringTopic토픽 이름
time.DurationAckDeadline메시지 응답 대기 시간
time.DurationRetentionDuration메시지 보존 주기
- Default: 604,800s(7일)
PushConfigPushConfigPushSubscription의 설정값
ObjectStorageConfigObjectStorageConfigObjectStorageSubscription의 설정값
intMaxDeliveryAttempt재처리 횟수 Default: 1(무한)

pubsub.PushConfig

TypeField설명
stringEndpoint메시지를 전송할 목적지 endpoint
intBatchSize한번에 보낼 메시지의 개수 Default: 1

pubsub.ObjectStorageConfig

TypeField설명
stringBucketObject Storage 버킷 이름
intExportIntervalMinutesObject Storage로 적재하는 주기
- Min: 1 min
- Max: 10 min
- Default: 5 min

⚠️ Interval 내 최소 1번 파일이 생성되며, Interval 이전에도 파일이 생성될 수 있음
stringFilePrefixObject Storage로 적재하는 파일의 Prefix
stringFileSuffixObject Storage로 적재하는 파일의 Suffix

pubsub.SubscriptionResult

TypeField설명
SubscriptionResultGet(ctx context.Context) (string, error)요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking
⚠️ option.WithWaitStatus() 옵션 설정 시 활성화
SubscriptionResultName() string요청한 서브스크립션 이름
Example
func (c *Client) CreateSubscription(ctx context.Context, name string, cfg SubscriptionConfig, opts ...option.SubscriptionOption) (*SubscriptionResult)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

pullSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
}

pushSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:9999/push",
BatchSize: 1,
},
}

objectStorageSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
RetentionDuration: 24 * time.Hour,
ObjectStorageConfig: &pubsub.ObjectStorageConfig{
Bucket: "bucket",
ExportIntervalMinutes: 2,
FilePrefix: "prefix",
FileSuffix: "suffix",
},
}
result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())

success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success :%s\n", result.Name(), status)

result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())

success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)

result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())

success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
return nil
}

서브스크립션 삭제

아래 항목을 이용하여 서브스크립션을 삭제할 수 있습니다.

Method
TypeMethod설명
SubscriptionDelete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult서브스크립션 삭제
Parameters

pubsub.SubscriptionResult

TypeField설명
SubscriptionResultGet(ctx context.Context) (string, error)요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking
⚠️ option.WithWaitStatus() 옵션 설정 시 활성화
SubscriptionResultName() string요청한 서브스크립션 이름
Example
func (s *Subscription) Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionDelete(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
result := subscription.Delete(ctx, option.WithWaitStatus())

_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
fmt.Printf("Subscription deleted %s\n", result.Name())
return nil
}

메시지

메시지 게시

토픽에 메시지를 게시할 수 있습니다.

Method
TypeMethod
PublishResultPublish(ctx context.Context, msg *Message)
Parameters
TypeField설명
Messagemsg게시할 메시지

pubsub.Message

TypeField설명
stringData메시지 본문
⚠️ Base64로 인코딩이 되어있어야 합니다.
map[string]stringAttributes메시지 속성
[key, value] 형식의 map
⚠️ kakaoc로 시작하는 key는 사용할 수 없습니다.

pubsub.PublishSettings

TypeField설명
time.DurationDelayThreshold설정한 시간마다 서버로 게시 요청을 전송
- Default: 100ms
intCountThreshold설정한 개수만큼 메시지를 모아서 전송
- Default: 100
intNumGoroutines동시에 실행되는 go routine 개수를 제한
- Default: 2
- Max: 10
time.DurationTimeout서버로의 요청 타임아웃 시간을 설정
- Default: 60s
FlowControlSettingsFlowControlSettings흐름 제어를 통해 게시 속도 제어

pubsub.FlowControlSettings

TypeField설명
intMaxOutstandingMessages아직 게시되지 않은 메시지의 최대 개수를 지정
- Default: 1000
intMaxOutstandingBytes아직 게시되지 않은 메시지의 최대 크기를 지정
- Default: 1G
LimitExceededBehaviorLimitExceededBehavior제한을 초과했을 때, 동작을 정의
- ignore : FlowContol을 사용하지 않음
- block(default) : 제한을 초과하지 않고 요청할 수 있을 때까지 대기 (⚠️ 개별 요청의 대기 시간(latency)이 중요하지 않은 Batching Processing에서 유용함
- signal error : error 반환

pubsub.PublishResult

TypeFunction설명
stringGet(ctx context.Context)실제 서버에 게시된 message id 정보를 가져옴
- 서버에 게시될 때까지 블로킹됨
Example
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}

메시지 비동기 수신

서브스크립션의 메시지를 비동기로 수신합니다.

주의

Pull 타입의 서브스크립션인 경우에만 사용 가능합니다.

Method
TypeMethod설명
SubscriptionReceive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error서브스크립션의 메시지를 스트리밍으로 수신하고, 콜백 함수를 실행
Parameters
TypeField설명
func(ctx2 context.Context, message *Message)callbackFunc메시지를 수신 후 실행할 콜백 함수

pubsub.ReceiveSettings

TypeField설명
intNumGoroutines동시에 실행되는 Goroutine 개수를 제한
- Default: 2
- Max: 10
FlowControlSettingsFlowControlSettings흐름 제어를 통해 게시 속도를 제어

pubsub.FlowControlSettings

TypeField설명
intMaxOutstandingMessages아직 게시되지 않은 메시지의 최대 개수를 지정
- Default: 1000
intMaxOutstandingBytes아직 게시되지 않은 메시지의 최대 크기를 지정
- Default: 1G
LimitExceededBehaviorLimitExceededBehavior제한을 초과했을 때, 동작을 정의
- ignore : FlowContol을 사용하지 않음
- block(default) : 제한을 초과하지 않고 요청할 수 있을 때까지 대기 (⚠️ 개별 요청의 대기 시간(latency)이 중요하지 않은 Batching Processing에서 유용함
- signal error : error 반환

pubsub.Message

TypeFunction설명
stringID메시지의 고유 ID
stringData메시지 본문
map[string]stringAttributes메시지 속성
time.TimePublishTime메시지가 게시된 시간
*intDeliveryAttempt메시지 수신 횟수
Example
func (s *Subscription) Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}

메시지 동기 수신

서브스크립션의 메시지를 동기로 수신합니다.

Method
TypeMethod설명
SubscriptionPull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)서브스크립션의 메시지를 동기로 소비
- 배치로 메시지 수신 가능
Parameters
TypeField설명
intmaxMessages배치 사이즈
- 한 번에 수신할 메시지의 수

pubsub.PulledMessage

pubsub.Message와 동일하지만, 읽기(Read)만 가능합니다.

TypeFunction설명
stringID메시지의 고유 ID
stringData메시지 본문
map[string]stringAttributes메시지 속성
time.TimePublishTime메시지가 게시된 시간
*intDeliveryAttempt메시지 수신 횟수
Example
func (s *Subscription) Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}

메시지 수신 확인

가져온 메시지를 확인 처리합니다.

Method
TypeMethod설명
SubscriptionAcknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)메시지의 AckID를 사용하여 메시지를 수신하였다는 것을 서브스크립션에 알림
Parameters
TypeField설명
[]stringackIDs메시지의 AckID

pubsub.AcknowledgeResponse

TypeField설명
[]*AcknowledgeFailureResultFailure요청 실패한 정보

pubsub.AcknowledgeFailureResult

TypeField설명
stringAckID요청 실패한 AckID
errorError요청 실패에 대한 이유
Example
func (s *Subscription) Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

resp, err := subscription.Acknowledge(ctx, ackIDs)
if err != nil {
// handle error
}
if resp != nil {
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
}
return nil
}

메시지 확인 기한 수정

메시지 확인 기한(Ack Deadline)을 수정합니다.

  • Extend: 메시지의 사용 시간을 현시점부터 서브스크립션의 AckDeadline 시간만큼 연장할 수 있습니다.
  • Skip: 메시지의 사용 시간을 0으로 설정하여, 다른 클라이언트에서 메시지를 수신할 수 있게 합니다.
Method
TypeMethod설명
SubscriptionModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction)
(*ModifyAckDeadlineResponse, error)
메시지의 AckID를 사용하여 메시지의 사용 기간을 늘리거나 줄임
Parameters
TypeField설명
[]stringackIDs메시지의 AckID
AckActionaction메시지의 AckDeadline 연장 여부
pubsub.ActionExtend, pubsub.ActionSkip

pubsub.ModifyAckDeadlineResponse

TypeField설명
[]*ModifyAckDeadlineSuccessResultSuccess요청 성공한 정보
[]*ModifyAckDeadlineFailureResultFailure요청 실패한 정보

pubsub.ModifyAckDeadlineSuccessResult

TypeField설명
stringAckID기존 AckID
stringReissuedAckID연장하면서 새로 발급된 AckID

pubsub.ModifyAckDeadlineFailureResult

TypeField설명
stringAckID요청 실패한 AckID
errorError요청 실패에 대한 이유
Example
func (s *Subscription) ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

// Skip 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}