메서드 목록
사전 작업 및 요구 사항
Pub/Sub Go SDK를 사용하기 위한 사전 작업 및 요구 사항은 SDK 개요 문서를 참고하시기 바랍니다.
토픽
토픽 조회
아래 항목을 이용하여 토픽을 조회할 수 있습니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Client | Topic(name string) *Topic | 실제 존재 여부와 상관없이 토픽 객체를 반환하며, Publish Setting 등의 설정을 할 수 있음 | 
| Topic | Exists(ctx context.Context) (bool, error) | 실제 토픽이 존재하는지 서버에서 조회 | 
| Topic | Config(ctx context.Context) (*TopicConfig, error) | 토픽의 정보를 조회 | 
Parameters
파라미터는 없습니다.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func topicGet(domainID, projectID, name string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    topic := client.Topic(name)
    exist, err := topic.Exists(ctx)
    if err != nil {
        return fmt.Errorf("topic.Exists: %v", err)
    }
    if !exist {
        return fmt.Errorf("not found topic")
    }
    fmt.Printf("Topic: %v\n", topic)
 
    tc, err := topic.Config(ctx)
    if err != nil {
        return fmt.Errorf("topic.Config: %v", err)
    }
    fmt.Printf("Topic config : %+v\n", tc)
 
    return nil
}
토픽 목록 조회
프로젝트 내의 토픽 목록을 조회합니다.
Method
| Type | Method | 
|---|---|
| Client | Topics(ctx context.Context) *TopicIterator | 
Parameters
파라미터는 없습니다.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "google.golang.org/api/iterator"
)
 
func topicList(domainID, projectID string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    it := client.Topics(ctx)
    for {
        topic, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return fmt.Errorf("Next: %v", err)
        }
        fmt.Printf("Topic: %v\n", topic)
    }
    return nil
}
토픽의 서브스크립션 목록 조회
특정 토픽의 서브스크립션 목록을 조회합니다.
Method
| Type | Method | 
|---|---|
| Topic | Subscriptions(ctx context.Context) *SubscriptionIterator | 
Parameters
파라미터는 없습니다.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "google.golang.org/api/iterator"
)
 
func topicSubscriptions(domainID, projectID, name string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    topic := client.Topic(name)
 
    it := topic.Subscriptions(ctx)
    for {
        sub, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return fmt.Errorf("Next: %v", err)
        }
       fmt.Printf("Subscription: %v\n", sub)
    }
    return nil
}
토픽 수정
특정 토픽의 보존 주기 및 설명을 수정할 수 있습니다.
Method
| Type | Method | 
|---|---|
| Topic | Update(ctx context.Context, cfg *TopicConfigToUpdate) (*TopicConfig, error) | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| TopicConfigToUpdate | cfg | 수정할 토픽에 대한 정보를 담고 있는 객체 | 
pubsub.TopicConfigToUpdate
| Type | Field | 설명 | 
|---|---|---|
| time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s(7일) | 
| string | Description | 토픽 설명 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "time"
)
 
func topicUpdate(domainID, projectID, name string, duration time.Duration) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    topic := client.Topic(name)
 
    cfg := &pubsub.TopicConfigToUpdate{
        RetentionDuration: duration,
        Description: "new description",
    }
    tc, err := topic.Update(ctx, cfg)
    if err != nil {
        return fmt.Errorf("Update: %v\n", err)
    }
    fmt.Printf("Topic Retention Duration: %s\n", tc.RetentionDuration)
    return nil
}
토픽 생성
아래 항목을 이용하여 새로운 토픽을 생성할 수 있습니다.
| Type | Method | 설명 | 
|---|---|---|
| Client | CreateTopic(ctx context.Context, name string) (*Topic, error) | 메시지 보존 주기는 기본값(7일)으로 설정됨 | 
| Client | CreateTopicWithConfig(ctx context.Context, name string, cfg *TopicConfig) (*Topic, error) | 
Parameters
| Type | Method | 설명 | 
|---|---|---|
| TopicConfig | cfg | 생성할 토픽에 대한 정보를 담고 있는 객체 | 
pubsub.TopicConfig
| Type | Field | 설명 | 
|---|---|---|
| time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s(7일) | 
| string | Description | 토픽 설명 | 
Example
import (
   "context"
   "fmt"
   "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func topicCreate(domainID, projectID, name string) error {
   ctx := context.Background()
   client, err := pubsub.NewClient(ctx, domainID, projectID)
   if err != nil {
      return fmt.Errorf("pubsub.NewClient: %v", err)
   }
   defer client.Close()
 
   topic, err := client.CreateTopic(ctx, name)
   if err != nil {
      return fmt.Errorf("CreateTopic: %v", err)
   }
 
   fmt.Printf("Topic created: %v\n", topic)
   return nil
}
import (
   "context"
   "fmt"
   "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
   "time"
)
 
func topicCreateWithConfig(domainID, projectID, name string, duration time.Duration) error {
   ctx := context.Background()
   client, err := pubsub.NewClient(ctx, domainID, projectID)
   if err != nil {
      return fmt.Errorf("pubsub.NewClient: %v", err)
   }
   defer client.Close()
 
   config := pubsub.TopicConfig{
      RetentionDuration: duration,
      Description: "topic description",
   }
   topic, err := client.CreateTopicWithConfig(ctx, name, &config)
   if err != nil {
      return fmt.Errorf("CreateTopicWithConfig: %v", err)
   }
 
   fmt.Printf("Topic created: %v\n", topic)
   return nil
}
토픽 삭제
아래 항목을 이용하여 토픽을 삭제할 수 있습니다.
토픽 삭제 시 하위 서브스크립션이 함께 삭제됩니다. 
Method
| Type | Method | 
|---|---|
| Topic | Delete(ctx context.Context) error | 
Parameters
파라미터는 없습니다.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func topicDelete(domainID, projectID, name string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    topic := client.Topic(name)
 
    err = topic.Delete(ctx)
    if err != nil {
        return fmt.Errorf("Delete: %v\n", err)
    }
    return nil
}
서브스크립션
서브스크립션 조회
서브스크립션 유무를 확인합니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Client | Subscription(name string) *Subscription | 실제 존재 여부와 상관없이 Subscription 객체를 반환 - ReceiveSettings 등의 설정을 할 수 있음 | 
| Subscription | Exists(ctx context.Context) (bool, error) | 실제 서브스크립션이 존재하는지 서버에서 조회 | 
| Subscription | Config(ctx context.Context) (*SubscriptionConfig, error) | 서브스크립션의 정보를 조회 | 
Parameters
파라미터는 없습니다.
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionGet(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    exist, err := subscription.Exists(ctx)
    if err != nil {
        return fmt.Errorf("subscription.Exists: %v", err)
    }
    if !exist {
        return fmt.Errorf("not found subscription")
    }
    fmt.Printf("Subscription: %v\n", subscription)
    sc, err := subscription.Config(ctx)
    if err != nil {
        return fmt.Errorf("subscription.Config: %v", err)
    }
    fmt.Printf("Subscription config : %+v\n", sc)
    return nil
}
서브스크립션 목록 조회
클라이언트에 설정된 도메인, 프로젝트에 속한 서브스크립션 목록을 조회합니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Client | Subscriptions(ctx context.Context) *SubscriptionIterator | 실제 서브스크립션 정보를 가져올 수 있는 iterator를 반환 | 
Parameters
파라미터는 없습니다.
Example
import (
    "context"
    "fmt"
    "google.golang.org/api/iterator"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionList(domainID, projectID string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    it := client.Subscriptions(context.Background())
    for {
        subscription, err := it.Next()
        if err == iterator.Done {
            break
        }
        if err != nil {
            return fmt.Errorf("Next: %v", err)
        }
        fmt.Printf("Subscription: %v\n", subscription)
    }
    return nil
}
서브스크립션 수정
서브스크립션을 수정합니다. 수정 시, 최소 하나의 항목을 포함해야 합니다.
| Name | 설명 | 
|---|---|
| AckDeadline | 기존 설정값과 달라야 함 - Min: 10 sec - Max: 600 sec | 
| MessageRetentionDuration | 기존 설정값과 달라야 함 - Min: 600 sec - Max: 604800 sec (7일) | 
| MaxDeliveryAttempt | 재처리 횟수 - Min: 1- Max: 무한(1~100 범위에서 횟수 지정 가능) - Default : -1 | 
| PushConfig.Endpoint | Push Subscription인 경우에만 필수이며, 기존 설정값과 달라야 함 | 
| PushConfig.BatchSize | Push Subscription인 경우에만 필수이며, 기존 설정값과 달라야 함 Default: 1(무한) | 
| ObjectStorageConfig.Bucket | Object Storage Subscription인 경우에만 필수 | 
| ObjectStorageConfig.ExportIntervalMinutes | - Min: 1min- Max: 10min- Default: 5min | 
| ObjectStorageConfig.FilePrefix | Object StorageSubscription인 경우, 기존 설정값과 달라야 함 | 
| ObjectStorageConfig.FileSuffix | Object StorageSubscription인 경우, 기존 설정값과 달라야 함 | 
| ObjectStorageConfig.ChannelCount | Object StorageSubscription인 경우, 기존 설정값과 달라야 함 | 
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | Update(ctx context.Context, cfg *SubscriptionConfigToUpdate, opts ...option.SubscriptionOption) *SubscriptionResult | SubscriptionConfigToUpdate값에 따라 서브스크립션을 수정 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| SubscriptionConfigToUpdate | cfg | 서브스크립션 수정에 필요한 설정값 | 
pubsub.SubscriptionConfigToUpdate
| Type | Field | 설명 | 
|---|---|---|
| time.Duration | AckDeadline | 메시지 응답 대기 시간 | 
| time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s(7일) | 
| int | MaxDeliveryAttempt | 재처리 횟수 - Min: 1- Max: 무한(1~100 범위에서 횟수 지정 가능) - Default: -1 | 
| PushConfig | PushConfig | PushSubscription의 설정값 | 
| ObjectStorageConfig | ObjectStorageConfig | ObjectStorageSubscription의 설정값 | 
pubsub.PushConfig
| Type | Field | 설명 | 
|---|---|---|
| string | Endpoint | 메시지를 전송할 목적지의 endpoint | 
| int | BatchSize | 한 번에 보낼 메시지의 개수 - Default: 1 | 
pubsub.ObjectStorageConfig
| Type | Field | 설명 | 
|---|---|---|
| string | Bucket | Object Storage 버킷 이름 | 
| int | ExportIntervalMinutes | Object Storage로 적재하는 주기 - Default: 5 | 
| string | FilePrefix | Object Storage로 적재하는 file의 prefix | 
| string | FileSuffix | Object Storage로 적재하는 file의 suffix | 
| int | ChannelCount | 내보내기 채널 개수 | 
pubsub.SubscriptionResult
| Type | Field | 설명 | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 | 
| SubscriptionResult | Name() string | 요청한 서브스크립션 이름 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "time"
)  
 
func subscriptionUpdate(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    subscription := client.Subscription(subName)
    updateConfig := &pubsub.SubscriptionConfigToUpdate{
        PushConfig: &pubsub.PushConfig{
            Endpoint:  "http://localhost:8888/push",
            BatchSize: 100,
        },
        AckDeadline:        30 * time.Second,
        RetentionDuration:  48 * time.Hour,
        MaxDeliveryAttempt: 20,
    }
    result := subscription.Update(context.Background(), updateConfig, option.WithWaitStatus())
 
    success, err := result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Update: %v\n", err)
    }
    fmt.Printf("Subscription updated %s, success: %s\n", result.Name(), status)
 
    return nil
}
서브스크립션 시점 되돌리기
서브스크립션 시점을 지정한 시간으로 되돌리고, 지정 시간 이후에 메시지를 다시 받아볼 수 있습니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | SeekToTime(ctx context.Context, t time.Time, opts ...option.SubscriptionOption) *SubscriptionResult | 서브스크립션 시점 되돌리기 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| time.Time | t | 서브스크립션 시점을 되돌릴 시간 | 
pubsub.SubscriptionResult
| Type | Field | 설명 | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 | 
| SubscriptionResult | Name() string | 요청한 서브스크립션 이름 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "time"
)
 
func subscriptionSeek(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    subscription := client.Subscription(subName)
    result := subscription.SeekToTime(ctx, time.Now().Add(-1*time.Hour), option.WithWaitStatus())
     
    _, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Seek: %v\n", err)
    }
    return nil
}
서브스크립션 생성
아래 항목을 이용하여 서브스크립션을 생성할 수 있습니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Client | CreateSubscription(ctx context.Context, name string, cfg *SubscriptionConfig, opts ...option.SubscriptionOption) *SubscriptionResult | 메시지 보존 주기는 기본값(7일)으로 설정됨 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| SubscriptionConfig | cfg | 생성할 Subscription에 대한 정보를 담고 있는 객체 | 
pubsub.SubscriptionConfig
| Type | Field | 설명 | 
|---|---|---|
| string | Topic | 토픽 이름 | 
| time.Duration | AckDeadline | 메시지 응답 대기 시간 | 
| time.Duration | RetentionDuration | 메시지 보존 주기 - Default: 604,800s(7일) | 
| PushConfig | PushConfig | PushSubscription의 설정값 | 
| ObjectStorageConfig | ObjectStorageConfig | ObjectStorageSubscription의 설정값 | 
| int | MaxDeliveryAttempt | 재처리 횟수 Default: 1(무한) | 
pubsub.PushConfig
| Type | Field | 설명 | 
|---|---|---|
| string | Endpoint | 메시지를 전송할 목적지 endpoint | 
| int | BatchSize | 한 번에 보낼 메시지의 개수 Default: 1 | 
pubsub.ObjectStorageConfig
| Type | Field | 설명 | 
|---|---|---|
| string | Bucket | Object Storage 버킷 이름 | 
| int | ExportIntervalMinutes | Object Storage로 적재하는 주기 - Min: 1min- Max: 10min- Default: 5min⚠️ Interval 내 최소 1번 파일이 생성되며, Interval 이전에도 파일이 생성될 수 있음 | 
| string | FilePrefix | Object Storage로 적재하는 파일의 Prefix | 
| string | FileSuffix | Object Storage로 적재하는 파일의 Suffix | 
| int | ChannelCount | 내보내기 채널 개수 | 
pubsub.SubscriptionResult
| Type | Field | 설명 | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 | 
| SubscriptionResult | Name() string | 요청한 서브스크립션 이름 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
    "time"
) 
 
func subscriptionCreate(domainID, projectID, topicName, pullSubName, pushSubName, objectStorageSubName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    pullSubConfig := &pubsub.SubscriptionConfig{
        Topic:              topicName,
        AckDeadline:        15 * time.Second,
        RetentionDuration:  24 * time.Hour,
        MaxDeliveryAttempt: 10,
    }
 
    pushSubConfig := &pubsub.SubscriptionConfig{
        Topic:              topicName,
        AckDeadline:        15 * time.Second,
        RetentionDuration:  24 * time.Hour,
        MaxDeliveryAttempt: 10,
        PushConfig: &pubsub.PushConfig{
            Endpoint:  "http://localhost:9999/push",
            BatchSize: 1,
        },
    }
 
    objectStorageSubConfig := &pubsub.SubscriptionConfig{
        Topic:              topicName,
        RetentionDuration:  24 * time.Hour,
        ObjectStorageConfig: &pubsub.ObjectStorageConfig{
            Bucket:  "bucket",
            ExportIntervalMinutes: 2,
            FilePrefix: "prefix",
            FileSuffix: "suffix",
        },
    }
    result := client.CreateSubscription(ctx, pullSubName, pullSubConfig, option.WithWaitStatus())
 
    success, err := result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Create: %v\n", err)
    }
    fmt.Printf("Subscription created: %s, success :%s\n", result.Name(), status)
     
    result = client.CreateSubscription(ctx, pushSubName, pushSubConfig, option.WithWaitStatus())
     
    success, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Create: %v\n", err)
    }
    fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
 
    result = client.CreateSubscription(ctx, objectStorageSubName, objectStorageSubConfig, option.WithWaitStatus())
     
    success, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Create: %v\n", err)
    }
    fmt.Printf("Subscription created: %s, success: %s\n", result.Name(), status)
    return nil
}
서브스크립션 삭제
아래 항목을 이용하여 서브스크립션을 삭제할 수 있습니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | Delete(ctx context.Context, opts ...option.SubscriptionOption) *SubscriptionResult | 서브스크립션 삭제 | 
Parameters
pubsub.SubscriptionResult
| Type | Field | 설명 | 
|---|---|---|
| SubscriptionResult | Get(ctx context.Context) (string, error) | 요청이 완료된 Subscription 상태를 가져올 수 있으며, 요청이 완료될 때까지 blocking ⚠️ option.WithWaitStatus() 옵션 설정 시 활성화 | 
| SubscriptionResult | Name() string | 요청한 서브스크립션 이름 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func subscriptionDelete(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    subscription := client.Subscription(subName)
    result := subscription.Delete(ctx, option.WithWaitStatus())
 
    _, err = result.Get(ctx)
    if err != nil {
        return fmt.Errorf("Delete: %v\n", err)
    }
    fmt.Printf("Subscription deleted %s\n", result.Name())
    return nil
}
메시지
메시지 게시
토픽에 메시지를 게시할 수 있습니다.
Method
| Type | Method | 
|---|---|
| PublishResult | Publish(ctx context.Context, msg *Message) | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| Message | msg | 게시할 메시지 | 
pubsub.Message
| Type | Field | 설명 | 
|---|---|---|
| string | Data | 메시지 본문 ⚠️ Base64로 인코딩이 되어있어야 합니다. | 
| map[string]string | Attributes | 메시지 속성 [key, value] 형식의 map ⚠️ kakaoc로 시작하는 key는 사용할 수 없습니다. | 
pubsub.PublishSettings
| Type | Field | 설명 | 
|---|---|---|
| time.Duration | DelayThreshold | 설정한 시간마다 서버로 게시 요청을 전송 - Default: 100ms | 
| int | CountThreshold | 설정한 개수만큼 메시지를 모아서 전송 - Default: 100 | 
| int | NumGoroutines | 동시에 실행되는 go routine 개수를 제한 - Default: 2- Max: 10 | 
| time.Duration | Timeout | 서버로의 요청 타임아웃 시간을 설정 - Default: 60s | 
| FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 게시 속도 제어 | 
pubsub.FlowControlSettings
| Type | Field | 설명 | 
|---|---|---|
| int | MaxOutstandingMessages | 아직 게시되지 않은 메시지의 최대 개수를 지정 - Default: 1000 | 
| int | MaxOutstandingBytes | 아직 게시되지 않은 메시지의 최대 크기를 지정 - Default: 1G | 
| LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore: FlowContol을 사용하지 않음- block(default) : 제한을 초과하지 않고 요청할 수 있을 때까지 대기 (⚠️ 개별 요청의 대기 시간(latency)이 중요하지 않은 Batching Processing에서 유용함- signal error: error 반환 | 
pubsub.PublishResult
| Type | Function | 설명 | 
|---|---|---|
| string | Get(ctx context.Context) | 실제 서버에 게시된 message id 정보를 가져옴 - 서버에 게시될 때까지 블로킹됨 | 
Example
import (
	"context"
	"fmt"
	"github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func topicPublish(domainID, projectID, name, msg string, attributes map[string]string) error {
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, domainID, projectID)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %v", err)
	}
	defer client.Close()
	topic := client.Topic(name)
	result := topic.Publish(ctx, &pubsub.Message{
		Data:       base64.StdEncoding.EncodeToString([]byte(msg)),
		Attributes: attributes,
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %v", err)
	}
	fmt.Printf("Published a message; msg ID: %v\n", id)
	return nil
}
메시지 비동기 수신
서브스크립션의 메시지를 비동기로 수신합니다.
Pull 타입의 서브스크립션인 경우에만 사용 가능합니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | Receive(ctx context.Context, callbackFunc func(ctx2 context.Context, message *Message)) error | 서브스크립션의 메시지를 스트리밍으로 수신하고, 콜백 함수를 실행 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| func(ctx2 context.Context, message *Message) | callbackFunc | 메시지를 수신 후 실행할 콜백 함수 | 
pubsub.ReceiveSettings
| Type | Field | 설명 | 
|---|---|---|
| int | NumGoroutines | 동시에 실행되는 Goroutine 개수를 제한 - Default: 2- Max: 10 | 
| FlowControlSettings | FlowControlSettings | 흐름 제어를 통해 게시 속도를 제어 | 
pubsub.FlowControlSettings
| Type | Field | 설명 | 
|---|---|---|
| int | MaxOutstandingMessages | 아직 게시되지 않은 메시지의 최대 개수를 지정 - Default: 1000 | 
| int | MaxOutstandingBytes | 아직 게시되지 않은 메시지의 최대 크기를 지정 - Default: 1G | 
| LimitExceededBehavior | LimitExceededBehavior | 제한을 초과했을 때, 동작을 정의 - ignore: FlowContol을 사용하지 않음- block(default) : 제한을 초과하지 않고 요청할 수 있을 때까지 대기 (⚠️ 개별 요청의 대기 시간(latency)이 중요하지 않은 Batching Processing에서 유용함- signal error: error 반환 | 
pubsub.Message
| Type | Function | 설명 | 
|---|---|---|
| string | ID | 메시지의 고유 ID | 
| string | Data | 메시지 본문 | 
| map[string]string | Attributes | 메시지 속성 | 
| time.Time | PublishTime | 메시지가 게시된 시간 | 
| *int | DeliveryAttempt | 메시지 수신 횟수 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func subscriptionReceive(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    subscription := client.Subscription(subName)
    subscription.ReceiveSettings.NumGoroutines = 10
    subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
        // Process message
        message.Ack()
    })
    return nil
}
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func subscriptionReceive(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    subscription := client.Subscription(subName)
 
    subscription.ReceiveSettings.NumGoroutines = 10
    err = subscription.Receive(ctx, func(ctx2 context.Context, message *pubsub.Message) {
        // Process message
        ackResult := message.AckWithResult()
        status, err := ackResult.Get(ctx2) //blocks until acknowledge is done
    })
    return nil
}
메시지 동기 수신
서브스크립션의 메시지를 동기로 수신합니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | Pull(ctx context.Context, maxMessages int) ([]*PulledMessage, error) | 서브스크립션의 메시지를 동기로 소비 - 배치로 메시지 수신 가능 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| int | maxMessages | 배치 사이즈 - 한 번에 수신할 메시지의 수 | 
pubsub.PulledMessage
pubsub.Message와 동일하지만, 읽기(Read)만 가능합니다.
| Type | Function | 설명 | 
|---|---|---|
| string | ID | 메시지의 고유 ID | 
| string | Data | 메시지 본문 | 
| map[string]string | Attributes | 메시지 속성 | 
| time.Time | PublishTime | 메시지가 게시된 시간 | 
| *int | DeliveryAttempt | 메시지 수신 횟수 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionPull(domainID, projectID, subName string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    maxMessages := 10
    msgs, err := subscription.Pull(ctx, maxMessages)
    for i, msg := range msgs {
        fmt.Printf("message [%v] : %v\n", i, msg)
    }
    return nil
}
메시지 수신 확인
가져온 메시지를 확인 처리합니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | Acknowledge(ctx context.Context, ackIDs []string) (*AcknowledgeResponse, error) | 메시지의 AckID를 사용하여 메시지를 수신하였다는 것을 서브스크립션에 알림 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| []string | ackIDs | 메시지의 AckID | 
pubsub.AcknowledgeResponse
| Type | Field | 설명 | 
|---|---|---|
| []*AcknowledgeFailureResult | Failure | 요청 실패한 정보 | 
pubsub.AcknowledgeFailureResult
| Type | Field | 설명 | 
|---|---|---|
| string | AckID | 요청 실패한 AckID | 
| error | Error | 요청 실패에 대한 이유 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
 
func subscriptionAck(domainID, projectID, subName string, ackIDs []string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
 
    subscription := client.Subscription(subName)
 
    resp, err := subscription.Acknowledge(ctx, ackIDs)
    if err != nil {
        // handle error
    }
    if resp != nil {
        for _, failure := range resp.Failure {
            fmt.Printf("ack fail : %v\n", failure)
        }
    }
    return nil
}
메시지 확인 기한 수정
메시지 확인 기한(Ack Deadline)을 수정합니다.
- Extend: 메시지의 사용 시간을 현시점부터 서브스크립션의 AckDeadline 시간만큼 연장할 수 있습니다.
- Skip: 메시지의 사용 시간을 0으로 설정하여, 다른 클라이언트에서 메시지를 수신할 수 있게 합니다.
Method
| Type | Method | 설명 | 
|---|---|---|
| Subscription | ModifyAckDeadline(ctx context.Context, ackIDs []string, action AckAction) (*ModifyAckDeadlineResponse, error) | 메시지의 AckID를 사용하여 메시지의 사용 기간을 늘리거나 줄임 | 
Parameters
| Type | Field | 설명 | 
|---|---|---|
| []string | ackIDs | 메시지의 AckID | 
| AckAction | action | 메시지의 AckDeadline 연장 여부 pubsub.ActionExtend, pubsub.ActionSkip | 
pubsub.ModifyAckDeadlineResponse
| Type | Field | 설명 | 
|---|---|---|
| []*ModifyAckDeadlineSuccessResult | Success | 요청 성공한 정보 | 
| []*ModifyAckDeadlineFailureResult | Failure | 요청 실패한 정보 | 
pubsub.ModifyAckDeadlineSuccessResult
| Type | Field | 설명 | 
|---|---|---|
| string | AckID | 기존 AckID | 
| string | ReissuedAckID | 연장하면서 새로 발급된 AckID | 
pubsub.ModifyAckDeadlineFailureResult
| Type | Field | 설명 | 
|---|---|---|
| string | AckID | 요청 실패한 AckID | 
| error | Error | 요청 실패에 대한 이유 | 
Example
import (
    "context"
    "fmt"
    "github.kakaoenterprise.in/cloud-platform/kc-pub-sub-sdk-go"
)
func subscriptionModifyAckDeadline(domainID, projectID, subName string, ackIDs []string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, domainID, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %v", err)
    }
    defer client.Close()
    subscription := client.Subscription(subName)
    // Skip 예시
    resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionSkip)
    if err != nil {
        return fmt.Errorf("ModifyAckDeadline: %v", err)
    }
    // Extend 예시
    resp, err := subscription.ModifyAckDeadline(ctx, ackIDs, pubsub.ActionExtend)
    if err != nil {
        return fmt.Errorf("ModifyAckDeadline: %v", err)
    }
    return nil
}