메서드 목록
사전 작업 및 요구 사항
Pub/Sub Go SDK를 사용하기 위한 사전 작업 및 요구 사항은 SDK 개요 문서를 참고하시기 바랍니다.
kr-central-1 과 kr-central-2에서 지원하는 SDK 규격이 다르므르 리전별 규격을 확인해 주세요.
토픽 생성, 토픽 삭제, 서브스크립션 생성, 서브스크립션 삭제 API는 kr-central-2에서만 지원합니다.
토픽
토픽 조회
아래 항목을 이용하여 토픽을 조회할 수 있습니다.
- kr-central-1
- kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Topic | Topic(name string) | 실제 존재 여부와 상관없이 토픽 객체를 반환하며, Publish Setting 등의 설정을 할 수 있음 |
bool | Exists(ctx context.Context) | 실제 토픽이 존재하는지 서버에서 조회 |
TopicConfig | Config(ctx context.Context) | 토픽의 정보를 조회 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)
tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)
return nil
}
Method
Type | Method | 설명 |
---|---|---|
Client | Topic(name string) *Topic | 실제 존재 여부와 상관없이 토픽 객체를 반환하며, Publish Setting 등의 설정을 할 수 있음 |
Topic | Exists(ctx context.Context) (bool, error) | 실제 토픽이 존재하는지 서버에서 조회 |
Topic | Config(ctx context.Context) (*TopicConfig, error) | 토픽의 정보를 조회 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicGet(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
exist, err := topic.Exists(ctx)
if err != nil {
return fmt.Errorf("topic.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found topic")
}
fmt.Printf("Topic: %v\n", topic)
tc, err := topic.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config: %v", err)
}
fmt.Printf("Topic config : %+v\n", tc)
return nil
}
토픽 목록 조회
프로젝트 내의 토픽 목록을 조회합니다.
- kr-central-1
- kr-central-2
Method
Type | Method |
---|---|
TopicIterator | Topics(ctx context.Context) |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}
Method
Type | Method |
---|---|
Client | Topics(ctx context.Context) *TopicIterator |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Topics(ctx)
for {
topic, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Topic: %v\n", topic)
}
return nil
}
토픽의 서브스크립션 목록 조회
특정 토픽의 서브스크립션 목록을 조회합니다.
- kr-central-1
- kr-central-2
Method
Type | Method |
---|---|
SubscriptionIterator | Subscriptions(ctx context.Context) |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}
Method
Type | Method |
---|---|
Topic | Subscriptions(ctx context.Context) *SubscriptionIterator |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"google.golang.org/api/iterator"
)
func topicSubscriptions(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
it := topic.Subscriptions(ctx)
for {
sub, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", sub)
}
return nil
}
토픽 수정
특정 토픽의 보존 주기 및 설명을 수정할 수 있습니다.
- kr-central-1
- kr-central-2
Method
Type | Method |
---|---|
TopicConfig | Update(ctx context.Context, cfg TopicConfigToUpdate) |
Parameters
Type | Field | 설명 |
---|---|---|
TopicConfigToUpdate | cfg | 수정할 토픽에 대한 정보를 담고 있는 객체 |
pubsub.TopicConfigToUpdate
Type | Field | 설명 |
---|---|---|
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
string | Description | 토픽 설명 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
cfg := pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}
Method
Type | Method |
---|---|
Topic | Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error) |
Parameters
Type | Field | 설명 |
---|---|---|
TopicConfigToUpdate | cfg | 수정할 토픽에 대한 정보를 담고 있는 객체 |
pubsub.TopicConfigToUpdate
Type | Field | 설명 |
---|---|---|
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
string | Description | 토픽 설명 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
cfg := &pubsub.TopicConfigToUpdate{
RetentionDuration: duration,
Description: "new description",
}
tc, err := topic.Update(ctx, cfg)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
return nil
}
토픽 생성
아래 항목을 이용하여 새로운 토픽을 생성할 수 있습니다.
- kr-central-1
- kr-central-2
kr-central-1에서는 지원하지 않습니다.
Method
Type | Method | 설명 |
---|---|---|
Client | CreateTopic(ctx context.Context, name string) (*Topic, error) | 메시지 보존 주기는 기본값(7일)으로 설정됨 |
Client | CreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error) |
Parameters
Type | Method | 설명 |
---|---|---|
TopicConfig | cfg | 생성할 토픽에 대한 정보를 담고 있는 객체 |
pubsub.TopicConfig
Type | Field | 설명 |
---|---|---|
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
string | Description | 토픽 설명 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicCreate(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic, err := client.CreateTopic(ctx, name)
if err != nil {
return fmt.Errorf("CreateTopic: %v", err)
}
fmt.Printf("Topic created: %v\n", topic)
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
config := pubsub.TopicConfig{
RetentionDuration: duration,
Description: "topic description",
}
topic, err := client.CreateTopicWithConfig(ctx, name, &config)
if err != nil {
return fmt.Errorf("CreateTopicWithConfig: %v", err)
}
fmt.Printf("Topic created: %v\n", topic)
return nil
}
토픽 삭제
아래 항목을 이용하여 토픽을 삭제할 수 있습니다.
토픽 삭제 시 하위 서브스크립션이 함께 삭제됩니다.
- kr-central-1
- kr-central-2
kr-central-1에서는 지원하지 않습니다.
Method
Type | Method |
---|---|
Topic | Delete(ctx context.Context) error |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicDelete(domainID, projectID, name string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
err = topic.Delete(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
return nil
}
서브스크립션
서브스크립션 조회
서브스크립션 유무를 확인합니다.
- kr-central-1 & kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Client | Subscription(name string) *Subscription | 실제 존재 여부와 상관없이 Subscription 객체를 반환 - ReceiveSettings 등의 설정을 할 수 있음 |
Subscription | Exists(ctx context.Context) (bool, error) | 실제 서브스크립션이 존재하는지 서버에서 조회 |
Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | 서브스크립션의 정보를 조회 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
exist, err := subscription.Exists(ctx)
if err != nil {
return fmt.Errorf("subscription.Exists: %v", err)
}
if !exist {
return fmt.Errorf("not found subscription")
}
fmt.Printf("Subscription: %v\n", subscription)
sc, err := subscription.Config(ctx)
if err != nil {
return fmt.Errorf("subscription.Config: %v", err)
}
fmt.Printf("Subscription config : %+v\n", sc)
return nil
}
서브스크립션 목록 조회
클라이언트에 설정된 도메인, 프로젝트에 속한 서브스크립션 목록을 조회합니다.
- kr-central-1 & kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Client | Subscriptions(ctx context.Context) *SubscriptionIterator | 실제 서브스크립션 정보를 가져올 수 있는 iterator를 반환 |
Parameters
파라미터는 없습니다.
Example
import (
"context"
"fmt"
"google.golang.org/api/iterator"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
it := client.Subscriptions(context.Background())
for {
subscription, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
fmt.Printf("Subscription: %v\n", subscription)
}
return nil
}
서브스크립션 수정
서브스크립션을 수정합니다. 수정 시, 최소 하나의 항목을 포함해야 합니다.
- kr-central-1
- kr-central-2
Name | 설명 |
---|---|
AckDeadline | 기존 설정값과 달라야 함 - Min: 10 sec - Max: 600 sec |
MessageRetentionDuration | 기존 설정값과 달라야 함 - Min: 600 sec - Max: 604800 sec (7일) |
MaxDeliveryAttempt | 재처리 횟수 - Min: 1 - Max: 무한( -1 ) |
PushConfig.Endpoint | Push Subscription인 경우, 기존 설정값과 달라야 함 |
PushConfig.BatchSize | Push Subscription인 경우, 기존 설정값과 달라야 함 |
Method
Type | Method | 설명 |
---|---|---|
Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate) (*SubscriptionConfig, error) | SubscriptionConfigToUpdate 값에 따라 서브스크립션을 수정 |
Parameters
Type | Field | 설명 |
---|---|---|
*SubscriptionConfigToUpdate | cfg | 서브스크립션 수정에 필요한 설정값 |
pubsub.SubscriptionConfigToUpdate
Type | Field | 설명 |
---|---|---|
time.Duration | AckDeadline | 메시지 응답 대기 시간 |
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
int | MaxDeliveryAttempt | 재처리 횟수 - Min: 1 - Max: 무한 - Default: -1 (무한) |
*PushConfig | PushConfig | Push 서브스크립션의 설정값 |
pubsub.PushConfig
Type | Field | 설명 |
---|---|---|
string | Endpoint | 메시지를 전송할 목적지의 엔드포인트 |
int | BatchSize | 한 번에 보낼 메시지의 개수 - Default: 1 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
updateConfig := pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
updatedPushSubConfig, err := subscription.Update(context.Background(), &updateConfig)
fmt.Printf("Subscription updated config : %+v\n", updatedPushSubConfig)
return nil
}
Name | 설명 |
---|---|
AckDeadline | 기존 설정값과 달라야 함 - Min: 10 sec - Max: 600 sec |
MessageRetentionDuration | 기존 설정값과 달라야 함 - Min: 600 sec - Max: 604800 sec (7일) |
MaxDeliveryAttempt | 재처리 횟수 - Min: 1 - Max: 무한(1~100 범위에서 횟수 지정 가능) - Default : -1 |
PushConfig.Endpoint | Push Subscription인 경우에만 필수이며, 기존 설정값과 달라야 함 |
PushConfig.BatchSize | Push Subscription인 경우에만 필수이며, 기존 설정값과 달라야 함 Default: 1 (무한) |
ObjectStorageConfig.Bucket | Object Storage Subscription인 경우에만 필수 |
ObjectStorageConfig.ExportIntervalMinutes | - Min: 1 min- Max: 10 min - Default: 5 min |
ObjectStorageConfig.FilePrefix | Object StorageSubscription인 경우, 기존 설정값과 달라야 함 |
ObjectStorageConfig.FileSuffix | Object StorageSubscription인 경우, 기존 설정값과 달라야 함 |
Method
Type | Method | 설명 |
---|---|---|
Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult | SubscriptionConfigToUpdate 값에 따라 서브스크립션을 수정 |
Parameters
Type | Field | 설명 |
---|---|---|
SubscriptionConfigToUpdate | cfg | 서브스크립션 수정에 필요한 설정값 |
pubsub.SubscriptionConfigToUpdate
Type | Field | 설명 |
---|---|---|
time.Duration | AckDeadline | 메시지 응답 대기 시간 |
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
int | MaxDeliveryAttempt | 재처리 횟수 - Min: 1 - Max: 무한(1~100 범위에서 횟수 지정 가능) - Default: -1 |
PushConfig | PushConfig | PushSubscription의 설정값 |
ObjectStorageConfig | ObjectStorageConfig | ObjectStorageSubscription의 설정값 |
pubsub.PushConfig
Type | Field | 설명 |
---|---|---|
string | Endpoint | 메시지를 전송할 목적지의 endpoint |
int | BatchSize | 한 번에 보낼 메시지의 개수 - Default: 1 |
pubsub.ObjectStorageConfig
Type | Field | 설명 |
---|---|---|
string | Bucket | Object Storage 버킷 이름 |
int | ExportIntervalMinutes | Object Storage로 적재하는 주기 - Default: 5 |
string | FilePrefix | Object Storage로 적재하는 file의 prefix |
string | FileSuffix | Object Storage로 적재하는 file의 suffix |
pubsub.SubscriptionResult
Type | Field | 설명 |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 |
SubscriptionResult | Name() string | 요청한 서브스크립션 이름 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionUpdate(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
updateConfig := &pubsub.SubscriptionConfigToUpdate{
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:8888/push",
BatchSize: 100,
},
AckDeadline: 30 * time.Second,
RetentionDuration: 48 * time.Hour,
MaxDeliveryAttempt: 20,
}
result := subscription.Update(context.Background(), updateConfig, option.WithWaitStatus())
success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Update: %v\n", err)
}
fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)
return nil
}
서브스크립션 시점 되돌리기
서브스크립션 시점을 지정한 시간으로 되돌리고, 지정 시간 이후에 메시지를 다시 받아볼 수 있습니다.
- kr-central-1
- kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Subscription | SeekToTime(ctx context.Context, t time.Time) error | 서브스크립션 시점 되돌리기 |
Parameters
Type | Field | 설명 |
---|---|---|
time.Time | t | 서브스크립션 시점을 되돌릴 시간 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
err = subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour))
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}
Method
Type | Method | 설명 |
---|---|---|
Subscription | SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult | 서브스크립션 시점 되돌리기 |
Parameters
Type | Field | 설명 |
---|---|---|
time.Time | t | 서브스크립션 시점을 되돌릴 시간 |
pubsub.SubscriptionResult
Type | Field | 설명 |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 |
SubscriptionResult | Name() string | 요청한 서브스크립션 이름 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionSeek(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Seek: %v\n", err)
}
return nil
}
서브스크립션 생성
아래 항목을 이용하여 서브스크립션을 생성할 수 있습니다.
- kr-central-1
- kr-central-2
kr-central-1에서는 지원하지 않습니다.
Method
Type | Method | 설명 |
---|---|---|
Client | CreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResult | 메시지 보존 주기는 기본값(7일)으로 설정됨 |
Parameters
Type | Field | 설명 |
---|---|---|
SubscriptionConfig | cfg | 생성할 Subscription에 대한 정보를 담고 있는 객체 |
pubsub.SubscriptionConfig
Type | Field | 설명 |
---|---|---|
string | Topic | 토픽 이름 |
time.Duration | AckDeadline | 메시지 응답 대기 시간 |
time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s (7일) |
PushConfig | PushConfig | PushSubscription의 설정값 |
ObjectStorageConfig | ObjectStorageConfig | ObjectStorageSubscription의 설정값 |
int | MaxDeliveryAttempt | 재처리 횟수 Default: 1 (무한) |
pubsub.PushConfig
Type | Field | 설명 |
---|---|---|
string | Endpoint | 메시지를 전송할 목적지 endpoint |
int | BatchSize | 한번에 보낼 메시지의 개수 Default: 1 |
pubsub.ObjectStorageConfig
Type | Field | 설명 |
---|---|---|
string | Bucket | Object Storage 버킷 이름 |
int | ExportIntervalMinutes | Object Storage로 적재하는 주기 - Min: 1 min- Max: 10 min - Default: 5 min⚠️ Interval 내 최소 1번 파일이 생성되며, Interval 이전에도 파일이 생성될 수 있음 |
string | FilePrefix | Object Storage로 적재하는 파일의 Prefix |
string | FileSuffix | Object Storage로 적재하는 파일의 Suffix |
pubsub.SubscriptionResult
Type | Field | 설명 |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 |
SubscriptionResult | Name() string | 요청한 서브스크립션 이름 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
"time"
)
func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
pullSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
}
pushSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
AckDeadline: 15 * time.Second,
RetentionDuration: 24 * time.Hour,
MaxDeliveryAttempt: 10,
PushConfig: &pubsub.PushConfig{
Endpoint: "http://localhost:9999/push",
BatchSize: 1,
},
}
objectStorageSubConfig := &pubsub.SubscriptionConfig{
Topic: topicName,
RetentionDuration: 24 * time.Hour,
ObjectStorageConfig: &pubsub.ObjectStorageConfig{
Bucket: "bucket",
ExportIntervalMinutes: 2,
FilePrefix: "prefix",
FileSuffix: "suffix",
},
}
result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())
success, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success :%s\n", result.Name(), status)
result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())
success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())
success, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Create: %v\n", err)
}
fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
return nil
}
서브스크립션 삭제
아래 항목을 이용하여 서브스크립션을 삭제할 수 있습니다.
- kr-central-1
- kr-central-2
kr-central-1에서는 지원하지 않습니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult | 서브스크립션 삭제 |
Parameters
pubsub.SubscriptionResult
Type | Field | 설명 |
---|---|---|
SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 |
SubscriptionResult | Name() string | 요청한 서브스크립션 이름 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionDelete(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
result := subscription.Delete(ctx, option.WithWaitStatus())
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("Delete: %v\n", err)
}
fmt.Printf("Subscription deleted %s\n", result.Name())
return nil
}
메시지
메시지 게시
토픽에 메시지를 게시할 수 있습니다.
- kr-central-1
- kr-central-2
Method
Type | Method |
---|---|
PublishResult | Publish(ctx context.Context, msg *Message) |
Parameters
Type | Field | 설명 |
---|---|---|
Message | msg | 게시할 메시지 |
pubsub.Message
Type | Field | 설명 |
---|---|---|
string | Data | 메시지 본문 ⚠️ Base64로 인코딩이 되어있어야 합니다. |
map[string]string | Attributes | 메시지 속성 [key, value] 형식의 map ⚠️ kakaoc 로 시작하는 key는 사용할 수 없습니다. |
pubsub.PublishSettings
Type | Field | 설명 |
---|---|---|
time.Duration | DelayThreshold | 설정한 시간마다 서버로 게시 요청을 전송 |
int | CountThreshold | 설정한 개수만큼 메시지를 모아서 전송 |
int | NumGoroutines | 동시에 실행되는 go routine 개수를 제한 |
time.Duration | Timeout | 서버로의 요청 타임아웃 시간을 설정 |
FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 수신 속도 제어 |
pubsub.FlowControlSettings
Type | Field | 설명 |
---|---|---|
int | MaxOutstandingMessages | 아직 게시되지 않은 메시지의 최대 개수를 지정 |
int | MaxOutstandingBytes | 아직 게시되지 않은 메시지의 최대 크기를 지정 |
LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore - block - signal error (default) |
pubsub.PublishResult
Type | Function | 설명 |
---|---|---|
string | Get(ctx context.Context) | 실제 서버에 게시된 message id 정보를 가져옴 - 서버에 게시될 때까지 블로킹됨 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg))
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}
Method
Type | Method |
---|---|
PublishResult | Publish(ctx context.Context, msg *Message) |
Parameters
Type | Field | 설명 |
---|---|---|
Message | msg | 게시할 메시지 |
pubsub.Message
Type | Field | 설명 |
---|---|---|
string | Data | 메시지 본문 ⚠️ Base64로 인코딩이 되어있어야 합니다. |
map[string]string | Attributes | 메시지 속성 [key, value] 형식의 map ⚠️ kakaoc 로 시작하는 key는 사용할 수 없습니다. |
pubsub.PublishSettings
Type | Field | 설명 |
---|---|---|
time.Duration | DelayThreshold | 설정한 시간마다 서버로 게시 요청을 전송 - Default: 100ms |
int | CountThreshold | 설정한 개수만큼 메시지를 모아서 전송 - Default: 100 |
int | NumGoroutines | 동시에 실행되는 go routine 개수를 제한 - Default: 2 - Max: 10 |
time.Duration | Timeout | 서버로의 요청 타임아웃 시간을 설정 - Default: 60s |
FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 게시 속도 제어 |
pubsub.FlowControlSettings
Type | Field | 설명 |
---|---|---|
int | MaxOutstandingMessages | 아직 게시되지 않은 메시지의 최대 개수를 지정 - Default: 1000 |
int | MaxOutstandingBytes | 아직 게시되지 않은 메시지의 최대 크기를 지정 - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore : FlowContol을 사용하지 않음 - block (default) : 제한을 초과하지 않고 요청할 수 있을 때까지 대기 (⚠️ 개별 요청의 대기 시간(latency)이 중요하지 않은 Batching Processing에서 유용함 - signal error : error 반환 |
pubsub.PublishResult
Type | Function | 설명 |
---|---|---|
string | Get(ctx context.Context) | 실제 서버에 게시된 message id 정보를 가져옴 - 서버에 게시될 때까지 블로킹됨 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub: NewClient: %v", err)
}
defer client.Close()
topic := client.Topic(name)
result := topic.Publish(ctx, &pubsub.Message{
Data: base64.StdEncoding.EncodeToString([]byte(msg)),
Attributes: attributes,
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("pubsub: result.Get: %v", err)
}
fmt.Printf("Published a message; msg ID: %v\n", id)
return nil
}
메시지 비동기 수신
서브스크립션의 메시지를 비동기로 수신합니다.
- kr-central-1
- kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | 서브스크립션의 메시지를 스트리밍으로 수신하고, 콜백 함수를 실행 |
Parameters
Type | Field | 설명 |
---|---|---|
func(ctx2 context.Context, message *Message) | callbackFunc | 메시지를 수신 후 실행할 콜백 함수 |
pubsub.ReceiveSettings
Type | Field | 설명 |
---|---|---|
int | NumGoroutines | 동시에 실행되는 Goroutine 개수를 제한 - Default: 2 - Max: 10 |
FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 게시 속도를 제어 |
pubsub.FlowControlSettings
Type | Field | 설명 |
---|---|---|
int | MaxOutstandingMessages | 동시에 처리될 수 있는 메시지의 최대 개수를 지정 - Default: 1000 |
int | MaxOutstandingBytes | 동시에 처리될 수 있는 메시지의 최대 크기를 지정 - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore - block (default)- signal error |
pubsub.Message
Type | Function | 설명 |
---|---|---|
string | ID | 메시지의 고유 ID |
string | Data | 메시지 본문 |
map[string]string | Attributes | 메시지 속성 |
time.Time | PublishTime | 메시지가 게시된 시간 |
*int | DeliveryAttempt | 메시지 수신 횟수 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}
Pull 타입의 서브스크립션인 경우에만 사용 가능합니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | 서브스크립션의 메시지를 스트리밍으로 수신하고, 콜백 함수를 실행 |
Parameters
Type | Field | 설명 |
---|---|---|
func(ctx2 context.Context, message *Message) | callbackFunc | 메시지를 수신 후 실행할 콜백 함수 |
pubsub.ReceiveSettings
Type | Field | 설명 |
---|---|---|
int | NumGoroutines | 동시에 실행되는 Goroutine 개수를 제한 - Default: 2 - Max: 10 |
FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 게시 속도를 제어 |
pubsub.FlowControlSettings
Type | Field | 설명 |
---|---|---|
int | MaxOutstandingMessages | 아직 게시되지 않은 메시지의 최대 개수를 지정 - Default: 1000 |
int | MaxOutstandingBytes | 아직 게시되지 않은 메시지의 최대 크기를 지정 - Default: 1G |
LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore : FlowContol을 사용하지 않음 - block (default) : 제한을 초과하지 않고 요청할 수 있을 때까지 대기 (⚠️ 개별 요청의 대기 시간(latency)이 중요하지 않은 Batching Processing에서 유용함 - signal error : error 반환 |
pubsub.Message
Type | Function | 설명 |
---|---|---|
string | ID | 메시지의 고유 ID |
string | Data | 메시지 본문 |
map[string]string | Attributes | 메시지 속성 |
time.Time | PublishTime | 메시지가 게시된 시간 |
*int | DeliveryAttempt | 메시지 수신 횟수 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
message.Ack()
})
return nil
}
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionReceive(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
subscription.ReceiveSettings.NumGoroutines = 10
err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
// Process message
ackResult := message.AckWithResult()
status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
})
return nil
}
메시지 동기 수신
서브스크립션의 메시지를 동기로 수신합니다.
- kr-central-1 & kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | 서브스크립션의 메시지를 동기로 소비 - 배치로 메시지 수신 가능 |
Parameters
Type | Field | 설명 |
---|---|---|
int | maxMessages | 배치 사이즈 - 한 번에 수신할 메시지의 수 |
pubsub.PulledMessage
pubsub.Message와 동일하지만, 읽기(Read)만 가능합니다.
Type | Function | 설명 |
---|---|---|
string | ID | 메시지의 고유 ID |
string | Data | 메시지 본문 |
map[string]string | Attributes | 메시지 속성 |
time.Time | PublishTime | 메시지가 게시된 시간 |
*int | DeliveryAttempt | 메시지 수신 횟수 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
maxMessages := 10
msgs, err := subscription.Pull(ctx, maxMessages)
for i, msg := range msgs {
fmt.Printf("message [%v] : %v\n", i, msg)
}
return nil
}
메시지 수신 확인
가져온 메시지를 확인 처리합니다.
- kr-central-1
- kr-central-2
Method
Type | Method | 설명 |
---|---|---|
Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | 메시지의 AckID를 사용하여 메시지를 수신하였다는 것을 서브스크립션측에 알림 |
Parameters
Type | Field | 설명 |
---|---|---|
[]string | ackIDs | 메시지의 AckID |
pubsub.AcknowledgeResponse
Type | Field | 설명 |
---|---|---|
[]*AcknowledgeFailureResult | Failure | 요청 실패한 정보 |
pubsub.AcknowledgeFailureResult
Type | Field | 설명 |
---|---|---|
string | AckID | 요청 실패한 AckID |
error | Error | 요청 실패에 대한 이유 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
resp, err := subscription.Acknowledge(ctx, ackIDs)
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
return nil
}
Method
Type | Method | 설명 |
---|---|---|
Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | 메시지의 AckID를 사용하여 메시지를 수신하였다는 것을 서브스크립션에 알림 |
Parameters
Type | Field | 설명 |
---|---|---|
[]string | ackIDs | 메시지의 AckID |
pubsub.AcknowledgeResponse
Type | Field | 설명 |
---|---|---|
[]*AcknowledgeFailureResult | Failure | 요청 실패한 정보 |
pubsub.AcknowledgeFailureResult
Type | Field | 설명 |
---|---|---|
string | AckID | 요청 실패한 AckID |
error | Error | 요청 실패에 대한 이유 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
resp, err := subscription.Acknowledge(ctx, ackIDs)
if err != nil {
// handle error
}
if resp != nil {
for _, failure := range resp.Failure {
fmt.Printf("ack fail : %v\n", failure)
}
}
return nil
}
메시지 확인 기한 수정
메시지 확인 기한(Ack Deadline)을 수정합니다.
- kr-central-1 & kr-central-2
Extend
: 메시지의 사용 시간을 현시점부터 서브스크립션의 AckDeadline 시간만큼 연장할 수 있습니다.Skip
: 메시지의 사용 시간을 0으로 설정하여, 다른 클라이언트에서 메시지를 수신할 수 있게 합니다.
Method
Type | Method | 설명 |
---|---|---|
Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | 메시지의 AckID를 사용하여 메시지의 사용 기간을 늘리거나 줄임 |
Parameters
Type | Field | 설명 |
---|---|---|
[]string | ackIDs | 메시지의 AckID |
AckAction | action | 메시지의 AckDeadline 연장 여부pubsub.ActionExtend, pubsub.ActionSkip |
pubsub.ModifyAckDeadlineResponse
Type | Field | 설명 |
---|---|---|
[]*ModifyAckDeadlineSuccessResult | Success | 요청 성공한 정보 |
[]*ModifyAckDeadlineFailureResult | Failure | 요청 실패한 정보 |
pubsub.ModifyAckDeadlineSuccessResult
Type | Field | 설명 |
---|---|---|
string | AckID | 기존 AckID |
string | ReissuedAckID | 연장하면서 새로 발급된 AckID |
pubsub.ModifyAckDeadlineFailureResult
Type | Field | 설명 |
---|---|---|
string | AckID | 요청 실패한 AckID |
error | Error | 요청 실패에 대한 이유 |
Example
import (
"context"
"fmt"
"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
ctx := context.Background()
client, err := pubsub.NewClient(ctx, domainID, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
subscription := client.Subscription(subName)
// Skip 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
// Extend 예시
resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
if err != nil {
return fmt.Errorf("ModifyAckDeadline: %v", err)
}
return nil
}