메서드 목록
토픽, 서브스크립션의 생성/삭제는 카카오클라우드 콘솔에서만 가능합니다.
사전 작업 및 요구 사항
Pub/Sub Go SDK를 사용하기 위한 사전 작업 및 요구 사항은 개요 문서를 참고하시기 바랍니다.
토픽
토픽 조회
아래 항목을 이용하여 토픽을 조회할 수 있습니다.
Method
Type | Method | 설명 |
---|---|---|
Topic | Topic(name string) | 실제 존재 여부와 상관없이 토픽 객체를 반환하며, Publish Setting 등의 설정을 할 수 있음 |
bool | Exists(ctx context.Context) | 실제 토픽이 존재하는지 서버에서 조회 |
TopicConfig | Config(ctx context.Context) | 토픽의 정보를 조회 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)
tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)
return nil
}
토픽 목록 조회
프로젝트 내의 토픽 목록을 조회합니다.
Method
Type | Method |
---|---|
TopicIterator | Topics(ctx context.Context) |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}
토픽의 서브스크립션 목록 조회
특정 토픽의 서브스크립션 목록을 조회합니다.
Method
Type | Method |
---|---|
SubscriptionIterator | Subscriptions(ctx context.Context) |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}
토픽 수정
특정 토픽의 보존 주기 및 설명을 수정할 수 있습니다.
Method
Type | Method |
---|---|
TopicConfig | Update(ctx context.Context, cfg TopicConfigToUpdate) |
Parameters
Type | Field | 설명 |
---|---|---|
TopicConfigToUpdate | cfg | 수정할 토픽에 대한 정보를 담고 있는 객체 |
pubsub.TopicConfigToUpdate
Type | Field | 설명 |
---|---|---|
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
string | Description | 토픽 설명 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
cfg := pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}
Subscription
서브스크립션 조회
서브스크립션 유무를 확인합니다.
Method
Type | Method | 설명 |
---|---|---|
Client | Subscription(name string) *Subscription | 실제 존재 여부와 상관없이 Subscription 객체를 반환 - ReceiveSettings 등의 설정을 할 수 있음 |
Subscription | Exists(ctx context.Context) (bool, error) | 실제 서브스크립션이 존재하는지 서버에서 조회 |
Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | 서브스크립션의 정보를 조회 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)
sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}
서브스크립션 목록 조회
클라이언트에 설정된 Domain, Project에 속한 서브스크립션 목록을 조회합니다.
Method
Type | Method | 설명 |
---|---|---|
Client | Subscriptions(ctx context.Context) *SubscriptionIterator | 실제 서브스크립션 정보를 가져올 수 있는 iterator를 반환 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}
서브스크립션 수정
서브스크립션을 수정합니다. 수정 시, 최소 하나의 항목을 포함해야 합니다.
Name | 설명 |
---|---|
AckDeadline | 기존 설정값과 달라야 함 - Min: 10 sec - Max: 600 sec |
MessageRetentionDuration | 기존 설정값과 달라야 함 - Min: 600 sec - Max: 604800 sec (7일) |
MaxDeliveryAttempt | 재처리 횟수 - Min: 1 - Max: 무한( -1 ) |
PushConfig.Endpoint | Push Subscription인 경우, 기존 설정값과 달라야 함 |
PushConfig.BatchSize | Push Subscription인 경우, 기존 설정값과 달라야 함 |
Method
Type | Method | 설명 |
---|---|---|
Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error) | SubscriptionConfigToUpdate 값에 따라 서브스크립션을 수정 |
Parameters
Type | Field | 설명 |
---|---|---|
*SubscriptionConfigToUpdate | cfg | 서브스크립션 수정에 필요한 설정값 |
pubsub.SubscriptionConfigToUpdate
Type | Field | 설명 |
---|---|---|
time.Duration | AckDeadline | 메시지 응답 대기 시간 |
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
int | MaxDeliveryAttempt | 재처리 횟수 - Min: 1 - Max: 무한 - Default: -1 (무한) |
*PushConfig | PushConfig | Push 서브스크립션의 설정값 |
pubsub.PushConfig
Type | Field | 설명 |
---|---|---|
string | Endpoint | 메시지를 전송할 목적지의 엔드포인트 |
int | BatchSize | 한 번에 보낼 메시지의 개수 - Default: 1 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
updateConfig := pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost/push:8888",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
updatedPushSubConfig, err := subscription.Update(context.Background(), &updateConfig)
fmt.Printf("Subscription updated config : %+v\n", updatedPushSubConfig)
return nil
}
서브스크립션 시점 되돌리기
서브스크립션 시점을 지정한 시간으로 되돌리고, 지정 시간 이후에 메시지를 다시 받아볼 수 있습니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | SeekToTime(ctx context.Context, t time.Time) error | 서브스크립션 시점 되돌리기 |
Parameters
Type | Field | 설명 |
---|---|---|
time.Time | t | 서브스크립션 시점을 되돌릴 시간 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
err = subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour))
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}
Message
메시지 게시
토픽에 메시지를 게시할 수 있습니다.
Method
Type | Method |
---|---|
PublishResult | Publish(ctx context.Context, msg *Message) |
Parameters
Type | Field | 설명 |
---|---|---|
Message | msg | 게시할 메시지 |
pubsub.Message
Type | Field | 설명 |
---|---|---|
string | Data | 메시지 본문 ⚠️ Base64로 인코딩이 되어있어야 합니다. |
map[string]string | Attributes | 메시지 속성 [key, value] 형식의 map ⚠️ kakaoc 로 시작하는 key는 사용할 수 없습니다. |
pubsub.PublishSettings
Type | Field | 설명 |
---|---|---|
time.Duration | DelayThreshold | 설정한 시간마다 서버로 게시 요청을 전송 |
int | CountThreshold | 설정한 개수만큼 메시지를 모아서 전송 |
int | NumGoroutines | 동시에 실행되는 go routine 개수를 제한 |
time.Duration | Timeout | 서버로의 요청 타임아웃 시간을 설정 |
FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 수신 속도 제어 |
pubsub.FlowControlSettings
Type | Field | 설명 |
---|---|---|
int | MaxOutstandingMessages | 아직 게시되지 않은 메시지의 최대 개수를 지정 |
int | MaxOutstandingBytes | 아직 게시되지 않은 메시지의 최대 크기를 지정 |
LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore - block - signal error (default) |
pubsub.PublishResult
Type | Function | 설명 |
---|---|---|
string | Get(ctx context.Context) | 실제 서버에 게시된 message id 정보를 가져옴 - 서버에 게시될 때까지 블로킹됨 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg))
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}
메시지 비동기 수신
서브스크립션의 메시지를 비동기로 수신합니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | 서브스크립션의 메시지를 스트리밍으로 수신하고, 콜백 함수를 실행 |
Parameters
Type | Field | 설명 |
---|---|---|
func(ctx2 context.Context, message *Message) | callbackFunc | 메시지를 수신 후 실행할 콜백 함수 |
pubsub.ReceiveSettings
Type | Field | 설명 |
---|---|---|
int | NumGoroutines | 동시에 실행되는 Goroutine 개수를 제한 - Default: 2 - Max: 10 |
FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 게시 속도를 제어 |
pubsub.FlowControlSettings
Type | Field | 설명 |
---|---|---|
int | MaxOutstandingMessages | 동시에 처리될 수 있는 메시지의 최대 개수를 지정 - Default: 1000 |
int | MaxOutstandingBytes | 동시에 처리될 수 있는 메시지의 최대 크기를 지정 - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore - block (default)- signal error |
pubsub.Message
Type | Function | 설명 |
---|---|---|
string | ID | 메시지의 고유 ID |
string | Data | 메시지 본문 |
map[string]string | Attributes | 메시지 속성 |
time.Time | PublishTime | 메시지가 게시된 시간 |
*int | DeliveryAttempt | 메시지 수신 횟수 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}
메시지 동기 수신
서브스크립션의 메시지를 동기로 수신합니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | 서브스크립션의 메시지를 동기로 소비 - 배치로 메시지 수신 가능 |
Parameters
Type | Field | 설명 |
---|---|---|
int | maxMessages | 배치 사이즈 - 한 번에 수신할 메시지의 수 |
pubsub.PulledMessage
pubsub.Message와 동일하지만, 읽기(Read)만 가능합니다.
Type | Function | 설명 |
---|---|---|
string | ID | 메시지의 고유 ID |
string | Data | 메시지 본문 |
map[string]string | Attributes | 메시지 속성 |
time.Time | PublishTime | 메시지가 게시된 시간 |
*int | DeliveryAttempt | 메시지 수신 횟수 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}
메시지 수신 확인
가져온 메시지를 확인 처리합니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | 메시지의 AckID를 사용하여 메시지를 수신하였다는 것을 서브스크립션측에 알림 |
Parameters
Type | Field | 설명 |
---|---|---|
[]string | ackIDs | 메시지의 AckID |
pubsub.AcknowledgeResponse
Type | Field | 설명 |
---|---|---|
[]*AcknowledgeFailureResult | Failure | 요청 실패한 정보 |
pubsub.AcknowledgeFailureResult
Type | Field | 설명 |
---|---|---|
string | AckID | 요청 실패한 AckID |
error | Error | 요청 실패에 대한 이유 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
resp, err := subscription.Acknowledge(ctx, ackIDs)
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
return nil
}
메시지 확인 기한 수정
메시지 확인 기한(Ack Deadline)을 수정합니다.
Extend
: 메시지의 사용 시간을 현시점부터 서브스크립션의 AckDeadline 시간만큼 연장할 수 있습니다.Skip
: 메시지의 사용 시간을 0으로 설정하여, 다른 클라이언트에서 메시지를 수신할 수 있게 합니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | 메시지의 AckID를 사용하여 메시지의 사용 기간을 늘리거나 줄임 |
Parameters
Type | Field | 설명 |
---|---|---|
[]string | ackIDs | 메시지의 AckID |
AckAction | action | 메시지의 AckDeadline 연장 여부pubsub.ActionExtend, pubsub.ActionSkip |
pubsub.ModifyAckDeadlineResponse
Type | Field | 설명 |
---|---|---|
[]*ModifyAckDeadlineSuccessResult | Success | 요청 성공한 정보 |
[]*ModifyAckDeadlineFailureResult | Failure | 요청 실패한 정보 |
pubsub.ModifyAckDeadlineSuccessResult
Type | Field | 설명 |
---|---|---|
string | AckID | 기존 AckID |
string | ReissuedAckID | 연장하면서 새로 발급된 AckID |
pubsub.ModifyAckDeadlineFailureResult
Type | Field | 설명 |
---|---|---|
string | AckID | 요청 실패한 AckID |
error | Error | 요청 실패에 대한 이유 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
// Skip 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}