mirror of
https://gitlab.com/pulsechaincom/go-pulse.git
synced 2024-12-31 16:11:21 +00:00
c76ad94492
This commit adds a build step to travis to auto-delete unstable archives older than 14 days (our regular release schedule) from Azure via ci.go purge. The commit also pulls in the latest Azure storage code, also switching over from the old import path (github.com/Azure/azure-sdk-for-go) to the new split one (github.com/Azure/azure-storage-go).
340 lines
12 KiB
Go
340 lines
12 KiB
Go
package storage
|
|
|
|
import (
|
|
"encoding/xml"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
const (
|
|
// casing is per Golang's http.Header canonicalizing the header names.
|
|
approximateMessagesCountHeader = "X-Ms-Approximate-Messages-Count"
|
|
userDefinedMetadataHeaderPrefix = "X-Ms-Meta-"
|
|
)
|
|
|
|
func pathForQueue(queue string) string { return fmt.Sprintf("/%s", queue) }
|
|
func pathForQueueMessages(queue string) string { return fmt.Sprintf("/%s/messages", queue) }
|
|
func pathForMessage(queue, name string) string { return fmt.Sprintf("/%s/messages/%s", queue, name) }
|
|
|
|
type putMessageRequest struct {
|
|
XMLName xml.Name `xml:"QueueMessage"`
|
|
MessageText string `xml:"MessageText"`
|
|
}
|
|
|
|
// PutMessageParameters is the set of options can be specified for Put Messsage
|
|
// operation. A zero struct does not use any preferences for the request.
|
|
type PutMessageParameters struct {
|
|
VisibilityTimeout int
|
|
MessageTTL int
|
|
}
|
|
|
|
func (p PutMessageParameters) getParameters() url.Values {
|
|
out := url.Values{}
|
|
if p.VisibilityTimeout != 0 {
|
|
out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
|
|
}
|
|
if p.MessageTTL != 0 {
|
|
out.Set("messagettl", strconv.Itoa(p.MessageTTL))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// GetMessagesParameters is the set of options can be specified for Get
|
|
// Messsages operation. A zero struct does not use any preferences for the
|
|
// request.
|
|
type GetMessagesParameters struct {
|
|
NumOfMessages int
|
|
VisibilityTimeout int
|
|
}
|
|
|
|
func (p GetMessagesParameters) getParameters() url.Values {
|
|
out := url.Values{}
|
|
if p.NumOfMessages != 0 {
|
|
out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
|
|
}
|
|
if p.VisibilityTimeout != 0 {
|
|
out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// PeekMessagesParameters is the set of options can be specified for Peek
|
|
// Messsage operation. A zero struct does not use any preferences for the
|
|
// request.
|
|
type PeekMessagesParameters struct {
|
|
NumOfMessages int
|
|
}
|
|
|
|
func (p PeekMessagesParameters) getParameters() url.Values {
|
|
out := url.Values{"peekonly": {"true"}} // Required for peek operation
|
|
if p.NumOfMessages != 0 {
|
|
out.Set("numofmessages", strconv.Itoa(p.NumOfMessages))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// UpdateMessageParameters is the set of options can be specified for Update Messsage
|
|
// operation. A zero struct does not use any preferences for the request.
|
|
type UpdateMessageParameters struct {
|
|
PopReceipt string
|
|
VisibilityTimeout int
|
|
}
|
|
|
|
func (p UpdateMessageParameters) getParameters() url.Values {
|
|
out := url.Values{}
|
|
if p.PopReceipt != "" {
|
|
out.Set("popreceipt", p.PopReceipt)
|
|
}
|
|
if p.VisibilityTimeout != 0 {
|
|
out.Set("visibilitytimeout", strconv.Itoa(p.VisibilityTimeout))
|
|
}
|
|
return out
|
|
}
|
|
|
|
// GetMessagesResponse represents a response returned from Get Messages
|
|
// operation.
|
|
type GetMessagesResponse struct {
|
|
XMLName xml.Name `xml:"QueueMessagesList"`
|
|
QueueMessagesList []GetMessageResponse `xml:"QueueMessage"`
|
|
}
|
|
|
|
// GetMessageResponse represents a QueueMessage object returned from Get
|
|
// Messages operation response.
|
|
type GetMessageResponse struct {
|
|
MessageID string `xml:"MessageId"`
|
|
InsertionTime string `xml:"InsertionTime"`
|
|
ExpirationTime string `xml:"ExpirationTime"`
|
|
PopReceipt string `xml:"PopReceipt"`
|
|
TimeNextVisible string `xml:"TimeNextVisible"`
|
|
DequeueCount int `xml:"DequeueCount"`
|
|
MessageText string `xml:"MessageText"`
|
|
}
|
|
|
|
// PeekMessagesResponse represents a response returned from Get Messages
|
|
// operation.
|
|
type PeekMessagesResponse struct {
|
|
XMLName xml.Name `xml:"QueueMessagesList"`
|
|
QueueMessagesList []PeekMessageResponse `xml:"QueueMessage"`
|
|
}
|
|
|
|
// PeekMessageResponse represents a QueueMessage object returned from Peek
|
|
// Messages operation response.
|
|
type PeekMessageResponse struct {
|
|
MessageID string `xml:"MessageId"`
|
|
InsertionTime string `xml:"InsertionTime"`
|
|
ExpirationTime string `xml:"ExpirationTime"`
|
|
DequeueCount int `xml:"DequeueCount"`
|
|
MessageText string `xml:"MessageText"`
|
|
}
|
|
|
|
// QueueMetadataResponse represents user defined metadata and queue
|
|
// properties on a specific queue.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
|
|
type QueueMetadataResponse struct {
|
|
ApproximateMessageCount int
|
|
UserDefinedMetadata map[string]string
|
|
}
|
|
|
|
// SetMetadata operation sets user-defined metadata on the specified queue.
|
|
// Metadata is associated with the queue as name-value pairs.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179348.aspx
|
|
func (c QueueServiceClient) SetMetadata(name string, metadata map[string]string) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
|
|
metadata = c.client.protectUserAgent(metadata)
|
|
headers := c.client.getStandardHeaders()
|
|
for k, v := range metadata {
|
|
headers[userDefinedMetadataHeaderPrefix+k] = v
|
|
}
|
|
|
|
resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
|
|
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
|
|
}
|
|
|
|
// GetMetadata operation retrieves user-defined metadata and queue
|
|
// properties on the specified queue. Metadata is associated with
|
|
// the queue as name-values pairs.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179384.aspx
|
|
//
|
|
// Because the way Golang's http client (and http.Header in particular)
|
|
// canonicalize header names, the returned metadata names would always
|
|
// be all lower case.
|
|
func (c QueueServiceClient) GetMetadata(name string) (QueueMetadataResponse, error) {
|
|
qm := QueueMetadataResponse{}
|
|
qm.UserDefinedMetadata = make(map[string]string)
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": []string{"metadata"}})
|
|
headers := c.client.getStandardHeaders()
|
|
resp, err := c.client.exec(http.MethodGet, uri, headers, nil, c.auth)
|
|
if err != nil {
|
|
return qm, err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
|
|
for k, v := range resp.headers {
|
|
if len(v) != 1 {
|
|
return qm, fmt.Errorf("Unexpected number of values (%d) in response header '%s'", len(v), k)
|
|
}
|
|
|
|
value := v[0]
|
|
|
|
if k == approximateMessagesCountHeader {
|
|
qm.ApproximateMessageCount, err = strconv.Atoi(value)
|
|
if err != nil {
|
|
return qm, fmt.Errorf("Unexpected value in response header '%s': '%s' ", k, value)
|
|
}
|
|
} else if strings.HasPrefix(k, userDefinedMetadataHeaderPrefix) {
|
|
name := strings.TrimPrefix(k, userDefinedMetadataHeaderPrefix)
|
|
qm.UserDefinedMetadata[strings.ToLower(name)] = value
|
|
}
|
|
}
|
|
|
|
return qm, checkRespCode(resp.statusCode, []int{http.StatusOK})
|
|
}
|
|
|
|
// CreateQueue operation creates a queue under the given account.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179342.aspx
|
|
func (c QueueServiceClient) CreateQueue(name string) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
|
|
headers := c.client.getStandardHeaders()
|
|
resp, err := c.client.exec(http.MethodPut, uri, headers, nil, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
return checkRespCode(resp.statusCode, []int{http.StatusCreated})
|
|
}
|
|
|
|
// DeleteQueue operation permanently deletes the specified queue.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179436.aspx
|
|
func (c QueueServiceClient) DeleteQueue(name string) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{})
|
|
resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
|
|
}
|
|
|
|
// QueueExists returns true if a queue with given name exists.
|
|
func (c QueueServiceClient) QueueExists(name string) (bool, error) {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueue(name), url.Values{"comp": {"metadata"}})
|
|
resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
|
|
if resp != nil && (resp.statusCode == http.StatusOK || resp.statusCode == http.StatusNotFound) {
|
|
return resp.statusCode == http.StatusOK, nil
|
|
}
|
|
|
|
return false, err
|
|
}
|
|
|
|
// PutMessage operation adds a new message to the back of the message queue.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179346.aspx
|
|
func (c QueueServiceClient) PutMessage(queue string, message string, params PutMessageParameters) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
|
|
req := putMessageRequest{MessageText: message}
|
|
body, nn, err := xmlMarshal(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
headers := c.client.getStandardHeaders()
|
|
headers["Content-Length"] = strconv.Itoa(nn)
|
|
resp, err := c.client.exec(http.MethodPost, uri, headers, body, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
return checkRespCode(resp.statusCode, []int{http.StatusCreated})
|
|
}
|
|
|
|
// ClearMessages operation deletes all messages from the specified queue.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179454.aspx
|
|
func (c QueueServiceClient) ClearMessages(queue string) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), url.Values{})
|
|
resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
|
|
}
|
|
|
|
// GetMessages operation retrieves one or more messages from the front of the
|
|
// queue.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179474.aspx
|
|
func (c QueueServiceClient) GetMessages(queue string, params GetMessagesParameters) (GetMessagesResponse, error) {
|
|
var r GetMessagesResponse
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
|
|
resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
|
|
if err != nil {
|
|
return r, err
|
|
}
|
|
defer resp.body.Close()
|
|
err = xmlUnmarshal(resp.body, &r)
|
|
return r, err
|
|
}
|
|
|
|
// PeekMessages retrieves one or more messages from the front of the queue, but
|
|
// does not alter the visibility of the message.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179472.aspx
|
|
func (c QueueServiceClient) PeekMessages(queue string, params PeekMessagesParameters) (PeekMessagesResponse, error) {
|
|
var r PeekMessagesResponse
|
|
uri := c.client.getEndpoint(queueServiceName, pathForQueueMessages(queue), params.getParameters())
|
|
resp, err := c.client.exec(http.MethodGet, uri, c.client.getStandardHeaders(), nil, c.auth)
|
|
if err != nil {
|
|
return r, err
|
|
}
|
|
defer resp.body.Close()
|
|
err = xmlUnmarshal(resp.body, &r)
|
|
return r, err
|
|
}
|
|
|
|
// DeleteMessage operation deletes the specified message.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/dd179347.aspx
|
|
func (c QueueServiceClient) DeleteMessage(queue, messageID, popReceipt string) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), url.Values{
|
|
"popreceipt": {popReceipt}})
|
|
resp, err := c.client.exec(http.MethodDelete, uri, c.client.getStandardHeaders(), nil, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
|
|
}
|
|
|
|
// UpdateMessage operation deletes the specified message.
|
|
//
|
|
// See https://msdn.microsoft.com/en-us/library/azure/hh452234.aspx
|
|
func (c QueueServiceClient) UpdateMessage(queue string, messageID string, message string, params UpdateMessageParameters) error {
|
|
uri := c.client.getEndpoint(queueServiceName, pathForMessage(queue, messageID), params.getParameters())
|
|
req := putMessageRequest{MessageText: message}
|
|
body, nn, err := xmlMarshal(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
headers := c.client.getStandardHeaders()
|
|
headers["Content-Length"] = fmt.Sprintf("%d", nn)
|
|
resp, err := c.client.exec(http.MethodPut, uri, headers, body, c.auth)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer readAndCloseBody(resp.body)
|
|
return checkRespCode(resp.statusCode, []int{http.StatusNoContent})
|
|
}
|