123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- package multitenant
- import (
- "context"
- "crypto/sha256"
- "encoding/base64"
- "flag"
- "math"
- "strings"
- "sync"
- "time"
- log "github.com/sirupsen/logrus"
- billing "github.com/weaveworks/billing-client"
- "github.com/weaveworks/scope/app"
- "github.com/weaveworks/scope/report"
- )
- // BillingEmitterConfig has everything we need to make a billing emitter
- type BillingEmitterConfig struct {
- Enabled bool
- DefaultInterval time.Duration
- UserIDer UserIDer
- }
- // RegisterFlags registers the billing emitter flags with the main flag set.
- func (cfg *BillingEmitterConfig) RegisterFlags(f *flag.FlagSet) {
- f.BoolVar(&cfg.Enabled, "app.billing.enabled", false, "enable emitting billing info")
- f.DurationVar(&cfg.DefaultInterval, "app.billing.default-publish-interval", 3*time.Second, "default publish interval to assume for reports")
- }
- // BillingEmitter is the billing emitter
- type BillingEmitter struct {
- app.Collector
- BillingEmitterConfig
- billing *billing.Client
- sync.Mutex
- intervalCache map[string]time.Duration
- rounding map[string]float64
- }
- // NewBillingEmitter changes a new billing emitter which emits billing events
- func NewBillingEmitter(upstream app.Collector, billingClient *billing.Client, cfg BillingEmitterConfig) (*BillingEmitter, error) {
- return &BillingEmitter{
- Collector: upstream,
- billing: billingClient,
- BillingEmitterConfig: cfg,
- intervalCache: make(map[string]time.Duration),
- rounding: make(map[string]float64),
- }, nil
- }
- // Add implements app.Collector
- func (e *BillingEmitter) Add(ctx context.Context, rep report.Report, buf []byte) error {
- now := time.Now().UTC()
- userID, err := e.UserIDer(ctx)
- if err != nil {
- // Underlying collector needs to get userID too, so it's OK to abort
- // here. If this fails, so will underlying collector so no point
- // proceeding.
- return err
- }
- rowKey, colKey := calculateDynamoKeys(userID, now)
- interval, nodes := e.scanReport(rep)
- // Cache the last-known value of interval for this user, and use
- // it if we didn't find one in this report.
- e.Lock()
- if interval != 0 {
- e.intervalCache[userID] = interval
- } else {
- if lastKnown, found := e.intervalCache[userID]; found {
- interval = lastKnown
- } else {
- interval = e.DefaultInterval
- }
- }
- // Billing takes an integer number of seconds, so keep track of the amount lost to rounding
- nodeSeconds := interval.Seconds()*float64(nodes) + e.rounding[userID]
- rounding := nodeSeconds - math.Floor(nodeSeconds)
- e.rounding[userID] = rounding
- e.Unlock()
- hasher := sha256.New()
- hasher.Write(buf)
- hash := "sha256:" + base64.URLEncoding.EncodeToString(hasher.Sum(nil))
- weaveNetCount := 0
- if hasWeaveNet(rep) {
- weaveNetCount = 1
- }
- amounts := billing.Amounts{
- billing.ContainerSeconds: int64(interval/time.Second) * int64(len(rep.Container.Nodes)),
- billing.NodeSeconds: int64(nodeSeconds),
- billing.WeaveNetSeconds: int64(interval/time.Second) * int64(weaveNetCount),
- }
- metadata := map[string]string{
- "row_key": rowKey,
- "col_key": colKey,
- }
- err = e.billing.AddAmounts(
- hash,
- userID,
- now,
- amounts,
- metadata,
- )
- if err != nil {
- // No return, because we want to proceed even if we fail to emit
- // billing data, so that defects in the billing system don't break
- // report collection. Just log the fact & carry on.
- log.Errorf("Failed emitting billing data: %v", err)
- }
- return e.Collector.Add(ctx, rep, buf)
- }
- func commandParameter(cmd, flag string) (string, bool) {
- i := strings.Index(cmd, flag)
- if i != -1 {
- // here we expect the command looks like `-foo=bar` or `-foo bar`
- aft := strings.Fields(cmd[i+len(flag):])
- if len(aft) > 0 && len(aft[0]) > 0 {
- if aft[0][0] == '=' {
- return aft[0][1:], true
- }
- return aft[0], true
- }
- }
- return "", false
- }
- func intervalFromCommand(cmd string) string {
- if strings.Contains(cmd, "scope") {
- if publishInterval, ok := commandParameter(cmd, "probe.publish.interval"); ok {
- // If spy interval is higher than publish interval, some reports will have no process data
- if spyInterval, ok := commandParameter(cmd, "spy.interval"); ok {
- pubDuration, err1 := time.ParseDuration(publishInterval)
- spyDuration, err2 := time.ParseDuration(spyInterval)
- if err1 == nil && err2 == nil && spyDuration > pubDuration {
- return spyInterval
- }
- }
- return publishInterval
- }
- }
- return ""
- }
- // scanReport counts the nodes tries to find any custom report interval
- // of this report. If it is malformed, or not set, it returns zero.
- func (e *BillingEmitter) scanReport(r report.Report) (time.Duration, int) {
- nHosts := 0
- // We scan the host nodes looking for ones reported by a per-node probe;
- // the Kubernetes cluster probe also makes host nodes but they only have a few fields set
- for _, h := range r.Host.Nodes {
- // Relying here on Uptime being something that changes in each report, hence will be in a delta report
- if _, ok := h.Latest.Lookup(report.Uptime); ok {
- nHosts++
- }
- }
- if r.Window != 0 {
- return r.Window, nHosts
- }
- var inter string
- for _, c := range r.Container.Nodes {
- if cmd, ok := c.Latest.Lookup(report.DockerContainerCommand); ok {
- if inter = intervalFromCommand(cmd); inter != "" {
- break
- }
- }
- }
- if inter == "" { // not found in containers: look in processes
- for _, c := range r.Process.Nodes {
- if cmd, ok := c.Latest.Lookup(report.Cmdline); ok {
- if inter = intervalFromCommand(cmd); inter != "" {
- break
- }
- }
- }
- }
- if inter == "" {
- return 0, nHosts
- }
- d, err := time.ParseDuration(inter)
- if err != nil {
- return 0, nHosts
- }
- return d, nHosts
- }
- // Tries to determine if this report came from a host running Weave Net
- func hasWeaveNet(r report.Report) bool {
- for _, n := range r.Overlay.Nodes {
- overlayType, _ := report.ParseOverlayNodeID(n.ID)
- if overlayType == report.WeaveOverlayPeerPrefix {
- return true
- }
- }
- return false
- }
- // Close shuts down the billing emitter and billing client flushing events.
- func (e *BillingEmitter) Close() {
- e.Collector.Close()
- _ = e.billing.Close()
- }
|