billing_emitter.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. package multitenant
  2. import (
  3. "context"
  4. "crypto/sha256"
  5. "encoding/base64"
  6. "flag"
  7. "math"
  8. "strings"
  9. "sync"
  10. "time"
  11. log "github.com/sirupsen/logrus"
  12. billing "github.com/weaveworks/billing-client"
  13. "github.com/weaveworks/scope/app"
  14. "github.com/weaveworks/scope/report"
  15. )
  16. // BillingEmitterConfig has everything we need to make a billing emitter
  17. type BillingEmitterConfig struct {
  18. Enabled bool
  19. DefaultInterval time.Duration
  20. UserIDer UserIDer
  21. }
  22. // RegisterFlags registers the billing emitter flags with the main flag set.
  23. func (cfg *BillingEmitterConfig) RegisterFlags(f *flag.FlagSet) {
  24. f.BoolVar(&cfg.Enabled, "app.billing.enabled", false, "enable emitting billing info")
  25. f.DurationVar(&cfg.DefaultInterval, "app.billing.default-publish-interval", 3*time.Second, "default publish interval to assume for reports")
  26. }
  27. // BillingEmitter is the billing emitter
  28. type BillingEmitter struct {
  29. app.Collector
  30. BillingEmitterConfig
  31. billing *billing.Client
  32. sync.Mutex
  33. intervalCache map[string]time.Duration
  34. rounding map[string]float64
  35. }
  36. // NewBillingEmitter changes a new billing emitter which emits billing events
  37. func NewBillingEmitter(upstream app.Collector, billingClient *billing.Client, cfg BillingEmitterConfig) (*BillingEmitter, error) {
  38. return &BillingEmitter{
  39. Collector: upstream,
  40. billing: billingClient,
  41. BillingEmitterConfig: cfg,
  42. intervalCache: make(map[string]time.Duration),
  43. rounding: make(map[string]float64),
  44. }, nil
  45. }
  46. // Add implements app.Collector
  47. func (e *BillingEmitter) Add(ctx context.Context, rep report.Report, buf []byte) error {
  48. now := time.Now().UTC()
  49. userID, err := e.UserIDer(ctx)
  50. if err != nil {
  51. // Underlying collector needs to get userID too, so it's OK to abort
  52. // here. If this fails, so will underlying collector so no point
  53. // proceeding.
  54. return err
  55. }
  56. rowKey, colKey := calculateDynamoKeys(userID, now)
  57. interval, nodes := e.scanReport(rep)
  58. // Cache the last-known value of interval for this user, and use
  59. // it if we didn't find one in this report.
  60. e.Lock()
  61. if interval != 0 {
  62. e.intervalCache[userID] = interval
  63. } else {
  64. if lastKnown, found := e.intervalCache[userID]; found {
  65. interval = lastKnown
  66. } else {
  67. interval = e.DefaultInterval
  68. }
  69. }
  70. // Billing takes an integer number of seconds, so keep track of the amount lost to rounding
  71. nodeSeconds := interval.Seconds()*float64(nodes) + e.rounding[userID]
  72. rounding := nodeSeconds - math.Floor(nodeSeconds)
  73. e.rounding[userID] = rounding
  74. e.Unlock()
  75. hasher := sha256.New()
  76. hasher.Write(buf)
  77. hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil))
  78. weaveNetCount := 0
  79. if hasWeaveNet(rep) {
  80. weaveNetCount = 1
  81. }
  82. amounts := billing.Amounts{
  83. billing.ContainerSeconds: int64(interval/time.Second) * int64(len(rep.Container.Nodes)),
  84. billing.NodeSeconds: int64(nodeSeconds),
  85. billing.WeaveNetSeconds: int64(interval/time.Second) * int64(weaveNetCount),
  86. }
  87. metadata := map[string]string{
  88. "row_key": rowKey,
  89. "col_key": colKey,
  90. }
  91. err = e.billing.AddAmounts(
  92. hash,
  93. userID,
  94. now,
  95. amounts,
  96. metadata,
  97. )
  98. if err != nil {
  99. // No return, because we want to proceed even if we fail to emit
  100. // billing data, so that defects in the billing system don't break
  101. // report collection. Just log the fact & carry on.
  102. log.Errorf("Failed emitting billing data: %v", err)
  103. }
  104. return e.Collector.Add(ctx, rep, buf)
  105. }
  106. func commandParameter(cmd, flag string) (string, bool) {
  107. i := strings.Index(cmd, flag)
  108. if i != -1 {
  109. // here we expect the command looks like `-foo=bar` or `-foo bar`
  110. aft := strings.Fields(cmd[i+len(flag):])
  111. if len(aft) > 0 && len(aft[0]) > 0 {
  112. if aft[0][0] == '=' {
  113. return aft[0][1:], true
  114. }
  115. return aft[0], true
  116. }
  117. }
  118. return "", false
  119. }
  120. func intervalFromCommand(cmd string) string {
  121. if strings.Contains(cmd, "scope") {
  122. if publishInterval, ok := commandParameter(cmd, "probe.publish.interval"); ok {
  123. // If spy interval is higher than publish interval, some reports will have no process data
  124. if spyInterval, ok := commandParameter(cmd, "spy.interval"); ok {
  125. pubDuration, err1 := time.ParseDuration(publishInterval)
  126. spyDuration, err2 := time.ParseDuration(spyInterval)
  127. if err1 == nil && err2 == nil && spyDuration > pubDuration {
  128. return spyInterval
  129. }
  130. }
  131. return publishInterval
  132. }
  133. }
  134. return ""
  135. }
  136. // scanReport counts the nodes tries to find any custom report interval
  137. // of this report. If it is malformed, or not set, it returns zero.
  138. func (e *BillingEmitter) scanReport(r report.Report) (time.Duration, int) {
  139. nHosts := 0
  140. // We scan the host nodes looking for ones reported by a per-node probe;
  141. // the Kubernetes cluster probe also makes host nodes but they only have a few fields set
  142. for _, h := range r.Host.Nodes {
  143. // Relying here on Uptime being something that changes in each report, hence will be in a delta report
  144. if _, ok := h.Latest.Lookup(report.Uptime); ok {
  145. nHosts++
  146. }
  147. }
  148. if r.Window != 0 {
  149. return r.Window, nHosts
  150. }
  151. var inter string
  152. for _, c := range r.Container.Nodes {
  153. if cmd, ok := c.Latest.Lookup(report.DockerContainerCommand); ok {
  154. if inter = intervalFromCommand(cmd); inter != "" {
  155. break
  156. }
  157. }
  158. }
  159. if inter == "" { // not found in containers: look in processes
  160. for _, c := range r.Process.Nodes {
  161. if cmd, ok := c.Latest.Lookup(report.Cmdline); ok {
  162. if inter = intervalFromCommand(cmd); inter != "" {
  163. break
  164. }
  165. }
  166. }
  167. }
  168. if inter == "" {
  169. return 0, nHosts
  170. }
  171. d, err := time.ParseDuration(inter)
  172. if err != nil {
  173. return 0, nHosts
  174. }
  175. return d, nHosts
  176. }
  177. // Tries to determine if this report came from a host running Weave Net
  178. func hasWeaveNet(r report.Report) bool {
  179. for _, n := range r.Overlay.Nodes {
  180. overlayType, _ := report.ParseOverlayNodeID(n.ID)
  181. if overlayType == report.WeaveOverlayPeerPrefix {
  182. return true
  183. }
  184. }
  185. return false
  186. }
  187. // Close shuts down the billing emitter and billing client flushing events.
  188. func (e *BillingEmitter) Close() {
  189. e.Collector.Close()
  190. _ = e.billing.Close()
  191. }