본문으로 건너뛰기

메서드 목록

안내

토픽, 서브스크립션의 생성/삭제는 카카오클라우드 콘솔에서만 가능합니다.

사전 작업 및 요구 사항

Pub/Sub Go SDK를 사용하기 위한 사전 작업 및 요구 사항은 개요 문서를 참고하시기 바랍니다.

토픽

토픽 조회

아래 항목을 이용하여 토픽을 조회할 수 있습니다.

Method
TypeMethod설명
TopicTopic(name string)실제 존재 여부와 상관없이 토픽 객체를 반환하며, Publish Setting 등의 설정을 할 수 있음
boolExists(ctx context.Context)실제 토픽이 존재하는지 서버에서 조회
TopicConfigConfig(ctx context.Context)토픽의 정보를 조회
Parameters

파라미터는 없습니다.

Example
func (c *Client) Topic(name string) *Topic
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)

tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)

return nil
}

토픽 목록 조회

프로젝트 내의 토픽 목록을 조회합니다.

Method
TypeMethod
TopicIteratorTopics(ctx context.Context)
Parameters

파라미터는 없습니다.

Example
func (c *Client) Topics(ctx context.Context) *TopicIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}

토픽의 서브스크립션 목록 조회

특정 토픽의 서브스크립션 목록을 조회합니다.

Method
TypeMethod
SubscriptionIteratorSubscriptions(ctx context.Context)
Parameters

파라미터는 없습니다.

Example
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)

func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}

토픽 수정

특정 토픽의 보존 주기 및 설명을 수정할 수 있습니다.

Method
TypeMethod
TopicConfigUpdate(ctx context.Context, cfg TopicConfigToUpdate)
Parameters
TypeField설명
TopicConfigToUpdatecfg수정할 토픽에 대한 정보를 담고 있는 객체

pubsub.TopicConfigToUpdate

TypeField설명
time.DurationRetentionDuration메시지 보존 주기
- Default: 604,800s(7일)
stringDescription토픽 설명
Example
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)

cfg := pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}

Subscription

서브스크립션 조회

서브스크립션 유무를 확인합니다.

Method
TypeMethod설명
ClientSubscription(name string) *Subscription실제 존재 여부와 상관없이 Subscription 객체를 반환
- ReceiveSettings 등의 설정을 할 수 있음
SubscriptionExists(ctx context.Context) (bool, error)실제 서브스크립션이 존재하는지 서버에서 조회
SubscriptionConfig(ctx context.Context) (*SubscriptionConfig, error)서브스크립션의 정보를 조회
Parameters

파라미터는 없습니다.

Example
func (c *Client) Subscription(name string) *Subscription
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)

sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}

서브스크립션 목록 조회

클라이언트에 설정된 Domain, Project에 속한 서브스크립션 목록을 조회합니다.

Method
TypeMethod설명
ClientSubscriptions(ctx context.Context) *SubscriptionIterator실제 서브스크립션 정보를 가져올 수 있는 iterator를 반환
Parameters

파라미터는 없습니다.

Example
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}

서브스크립션 수정

서브스크립션을 수정합니다. 수정 시, 최소 하나의 항목을 포함해야 합니다.

Name설명
AckDeadline기존 설정값과 달라야 함
- Min: 10 sec
- Max: 600 sec
MessageRetentionDuration기존 설정값과 달라야 함
- Min: 600 sec
- Max: 604800 sec (7일)
MaxDeliveryAttempt재처리 횟수
- Min: 1
- Max: 무한(-1)
PushConfig.EndpointPush Subscription인 경우, 기존 설정값과 달라야 함
PushConfig.BatchSizePush Subscription인 경우, 기존 설정값과 달라야 함
Method
TypeMethod설명
SubscriptionUpdate(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error)SubscriptionConfigToUpdate값에 따라 서브스크립션을 수정
Parameters
TypeField설명
*SubscriptionConfigToUpdatecfg서브스크립션 수정에 필요한 설정값

pubsub.SubscriptionConfigToUpdate

TypeField설명
time.DurationAckDeadline메시지 응답 대기 시간
time.DurationRetentionDuration메시지 보존 주기
- Default: 604,800s(7일)
intMaxDeliveryAttempt재처리 횟수
- Min: 1
- Max: 무한
- Default: -1 (무한)
*PushConfigPushConfigPush 서브스크립션의 설정값

pubsub.PushConfig

TypeField설명
stringEndpoint메시지를 전송할 목적지의 엔드포인트
intBatchSize한 번에 보낼 메시지의 개수
- Default: 1
Example
func (s *Subscription) Update(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
updateConfig := pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost/push:8888",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
updatedPushSubConfig, err := subscription.Update(context.Background(), &updateConfig)
fmt.Printf("Subscription updated config : %+v\n", updatedPushSubConfig)

return nil
}

서브스크립션 시점 되돌리기

서브스크립션 시점을 지정한 시간으로 되돌리고, 지정 시간 이후에 메시지를 다시 받아볼 수 있습니다.

Method
TypeMethod설명
SubscriptionSeekToTime(ctx context.Context, t time.Time) error서브스크립션 시점 되돌리기
Parameters
TypeField설명
time.Timet서브스크립션 시점을 되돌릴 시간
Example
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)

func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
err = subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour))
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}

Message

메시지 게시

토픽에 메시지를 게시할 수 있습니다.

Method
TypeMethod
PublishResultPublish(ctx context.Context, msg *Message)
Parameters
TypeField설명
Messagemsg게시할 메시지

pubsub.Message

TypeField설명
stringData메시지 본문
⚠️ Base64로 인코딩이 되어있어야 합니다.
map[string]stringAttributes메시지 속성
[key, value] 형식의 map
⚠️ kakaoc로 시작하는 key는 사용할 수 없습니다.

pubsub.PublishSettings

TypeField설명
time.DurationDelayThreshold설정한 시간마다 서버로 게시 요청을 전송
intCountThreshold설정한 개수만큼 메시지를 모아서 전송
intNumGoroutines동시에 실행되는 go routine 개수를 제한
time.DurationTimeout서버로의 요청 타임아웃 시간을 설정
FlowControlSettingsFlowControlSettings흐름 제어를 통해 수신 속도 제어

pubsub.FlowControlSettings

TypeField설명
intMaxOutstandingMessages아직 게시되지 않은 메시지의 최대 개수를 지정
intMaxOutstandingBytes아직 게시되지 않은 메시지의 최대 크기를 지정
LimitExceededBehaviorLimitExceededBehavior제한을 초과했을 때, 동작을 정의
- ignore
- block
- signal error(default)

pubsub.PublishResult

TypeFunction설명
stringGet(ctx context.Context)실제 서버에 게시된 message id 정보를 가져옴
- 서버에 게시될 때까지 블로킹됨
Example
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()

topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg))
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}

메시지 비동기 수신

서브스크립션의 메시지를 비동기로 수신합니다.

Method
TypeMethod설명
SubscriptionReceive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error서브스크립션의 메시지를 스트리밍으로 수신하고, 콜백 함수를 실행
Parameters
TypeField설명
func(ctx2 context.Context, message *Message)callbackFunc메시지를 수신 후 실행할 콜백 함수

pubsub.ReceiveSettings

TypeField설명
intNumGoroutines동시에 실행되는 Goroutine 개수를 제한
- Default: 2
- Max: 10
FlowControlSettingsFlowControlSettings흐름 제어를 통해 게시 속도를 제어

pubsub.FlowControlSettings

TypeField설명
intMaxOutstandingMessages동시에 처리될 수 있는 메시지의 최대 개수를 지정
- Default: 1000
intMaxOutstandingBytes동시에 처리될 수 있는 메시지의 최대 크기를 지정
- Default: 1G
LimitExceededBehaviorLimitExceededBehavior제한을 초과했을 때, 동작을 정의
- ignore
- block(default)
- signal error

pubsub.Message

TypeFunction설명
stringID메시지의 고유 ID
stringData메시지 본문
map[string]stringAttributes메시지 속성
time.TimePublishTime메시지가 게시된 시간
*intDeliveryAttempt메시지 수신 횟수
Example
func (s *Subscription) Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}

메시지 동기 수신

서브스크립션의 메시지를 동기로 수신합니다.

Method
TypeMethod설명
SubscriptionPull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)서브스크립션의 메시지를 동기로 소비
- 배치로 메시지 수신 가능
Parameters
TypeField설명
intmaxMessages배치 사이즈
- 한 번에 수신할 메시지의 수

pubsub.PulledMessage

pubsub.Message와 동일하지만, 읽기(Read)만 가능합니다.

TypeFunction설명
stringID메시지의 고유 ID
stringData메시지 본문
map[string]stringAttributes메시지 속성
time.TimePublishTime메시지가 게시된 시간
*intDeliveryAttempt메시지 수신 횟수
Example
func (s *Subscription) Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}

메시지 수신 확인

가져온 메시지를 확인 처리합니다.

Method
TypeMethod설명
SubscriptionAcknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)메시지의 AckID를 사용하여 메시지를 수신하였다는 것을 서브스크립션측에 알림
Parameters
TypeField설명
[]stringackIDs메시지의 AckID

pubsub.AcknowledgeResponse

TypeField설명
[]*AcknowledgeFailureResultFailure요청 실패한 정보

pubsub.AcknowledgeFailureResult

TypeField설명
stringAckID요청 실패한 AckID
errorError요청 실패에 대한 이유
Example
func (s *Subscription) Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

resp, err := subscription.Acknowledge(ctx, ackIDs)
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
return nil
}

메시지 확인 기한 수정

메시지 확인 기한(Ack Deadline)을 수정합니다.

  • Extend: 메시지의 사용 시간을 현시점부터 서브스크립션의 AckDeadline 시간만큼 연장할 수 있습니다.
  • Skip: 메시지의 사용 시간을 0으로 설정하여, 다른 클라이언트에서 메시지를 수신할 수 있게 합니다.
Method
TypeMethod설명
SubscriptionModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction)
(*ModifyAckDeadlineResponse, error)
메시지의 AckID를 사용하여 메시지의 사용 기간을 늘리거나 줄임
Parameters
TypeField설명
[]stringackIDs메시지의 AckID
AckActionaction메시지의 AckDeadline 연장 여부
pubsub.ActionExtend, pubsub.ActionSkip

pubsub.ModifyAckDeadlineResponse

TypeField설명
[]*ModifyAckDeadlineSuccessResultSuccess요청 성공한 정보
[]*ModifyAckDeadlineFailureResultFailure요청 실패한 정보

pubsub.ModifyAckDeadlineSuccessResult

TypeField설명
stringAckID기존 AckID
stringReissuedAckID연장하면서 새로 발급된 AckID

pubsub.ModifyAckDeadlineFailureResult

TypeField설명
stringAckID요청 실패한 AckID
errorError요청 실패에 대한 이유
Example
func (s *Subscription) ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error)
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)

func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()

subscription := client.Subscription(subName)

// Skip 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}