client.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. // Copyright The OpenTelemetry Authors
  2. // SPDX-License-Identifier: Apache-2.0
  3. package saphanareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/saphanareceiver"
  4. import (
  5. "context"
  6. "database/sql"
  7. "database/sql/driver"
  8. "errors"
  9. "fmt"
  10. sapdriver "github.com/SAP/go-hdb/driver"
  11. "go.opentelemetry.io/collector/receiver/scrapererror"
  12. )
  13. // Interface for a SAP HANA client. Implementation can be faked for testing.
  14. type client interface {
  15. Connect(ctx context.Context) error
  16. collectDataFromQuery(ctx context.Context, query *monitoringQuery) ([]map[string]string, error)
  17. Close() error
  18. }
  19. // Wraps the result of a query so that it can be mocked in tests
  20. type resultWrapper interface {
  21. Scan(dest ...any) error
  22. Close() error
  23. Next() bool
  24. }
  25. // Wraps the sqlDB interface so that it can be mocked in tests
  26. type dbWrapper interface {
  27. PingContext(ctx context.Context) error
  28. Close() error
  29. QueryContext(ctx context.Context, query string) (resultWrapper, error)
  30. }
  31. type standardResultWrapper struct {
  32. rows *sql.Rows
  33. }
  34. func (w *standardResultWrapper) Next() bool {
  35. return w.rows.Next()
  36. }
  37. func (w *standardResultWrapper) Scan(dest ...any) error {
  38. return w.rows.Scan(dest...)
  39. }
  40. func (w *standardResultWrapper) Close() error {
  41. return w.rows.Close()
  42. }
  43. type standardDBWrapper struct {
  44. db *sql.DB
  45. }
  46. func (w *standardDBWrapper) Close() error {
  47. return w.db.Close()
  48. }
  49. func (w *standardDBWrapper) PingContext(ctx context.Context) error {
  50. return w.db.PingContext(ctx)
  51. }
  52. func (w *standardDBWrapper) QueryContext(ctx context.Context, query string) (resultWrapper, error) {
  53. rows, err := w.db.QueryContext(ctx, query)
  54. if err != nil {
  55. return nil, err
  56. }
  57. resultWrapper := standardResultWrapper{rows}
  58. return &resultWrapper, nil
  59. }
  60. // Wraps the creation of a sqlDB so that it can be mocked in tests
  61. type sapHanaConnectionFactory interface {
  62. getConnection(c driver.Connector) dbWrapper
  63. }
  64. type defaultConnectionFactory struct{}
  65. func (f *defaultConnectionFactory) getConnection(c driver.Connector) dbWrapper {
  66. wrapper := standardDBWrapper{db: sql.OpenDB((c))}
  67. return &wrapper
  68. }
  69. // Wraps a SAP HANA database connection, implements `client` interface.
  70. type sapHanaClient struct {
  71. receiverConfig *Config
  72. connectionFactory sapHanaConnectionFactory
  73. client dbWrapper
  74. }
  75. var _ client = (*sapHanaClient)(nil)
  76. // Creates a SAP HANA database client
  77. func newSapHanaClient(cfg *Config, factory sapHanaConnectionFactory) client {
  78. return &sapHanaClient{
  79. receiverConfig: cfg,
  80. connectionFactory: factory,
  81. }
  82. }
  83. func (c *sapHanaClient) Connect(ctx context.Context) error {
  84. connector, err := sapdriver.NewDSNConnector(fmt.Sprintf("hdb://%s:%s@%s", c.receiverConfig.Username, c.receiverConfig.Password, c.receiverConfig.TCPAddr.Endpoint))
  85. if err != nil {
  86. return fmt.Errorf("error generating DSN for SAP HANA connection: %w", err)
  87. }
  88. tls, err := c.receiverConfig.TLSClientSetting.LoadTLSConfig()
  89. if err != nil {
  90. return fmt.Errorf("error generating TLS config for SAP HANA connection: %w", err)
  91. }
  92. connector.SetTLSConfig(tls)
  93. connector.SetApplicationName("OpenTelemetry Collector")
  94. client := c.connectionFactory.getConnection(connector)
  95. err = client.PingContext(ctx)
  96. if err == nil {
  97. c.client = client
  98. } else {
  99. client.Close()
  100. }
  101. return err
  102. }
  103. func (c *sapHanaClient) Close() error {
  104. if c.client != nil {
  105. client := c.client
  106. c.client = nil
  107. return client.Close()
  108. }
  109. return nil
  110. }
  111. func (c *sapHanaClient) collectDataFromQuery(ctx context.Context, query *monitoringQuery) ([]map[string]string, error) {
  112. rows, err := c.client.QueryContext(ctx, query.query)
  113. if err != nil {
  114. return nil, err
  115. }
  116. defer rows.Close()
  117. errors := scrapererror.ScrapeErrors{}
  118. var data []map[string]string
  119. ROW_ITERATOR:
  120. for rows.Next() {
  121. expectedFields := len(query.orderedMetricLabels) + len(query.orderedResourceLabels) + len(query.orderedStats)
  122. rowFields := make([]any, expectedFields)
  123. // Build a list of addresses that rows.Scan will load column data into
  124. for i := range rowFields {
  125. rowFields[i] = new(sql.NullString)
  126. }
  127. if err := rows.Scan(rowFields...); err != nil {
  128. return nil, err
  129. }
  130. values := map[string]string{}
  131. for _, label := range query.orderedResourceLabels {
  132. v, err := convertInterfaceToString(rowFields[0])
  133. if err != nil {
  134. errors.AddPartial(0, err)
  135. continue ROW_ITERATOR
  136. }
  137. // If value was null, we can't use this row
  138. if !v.Valid {
  139. errors.AddPartial(0, fmt.Errorf("database row NULL value for required resource label %s", label))
  140. continue ROW_ITERATOR
  141. }
  142. values[label] = v.String
  143. rowFields = rowFields[1:]
  144. }
  145. for _, label := range query.orderedMetricLabels {
  146. v, err := convertInterfaceToString(rowFields[0])
  147. if err != nil {
  148. errors.AddPartial(0, err)
  149. continue ROW_ITERATOR
  150. }
  151. // If value was null, we can't use this row
  152. if !v.Valid {
  153. errors.AddPartial(0, fmt.Errorf("database row NULL value for required metric label %s", label))
  154. continue ROW_ITERATOR
  155. }
  156. values[label] = v.String
  157. rowFields = rowFields[1:]
  158. }
  159. for _, stat := range query.orderedStats {
  160. v, err := convertInterfaceToString(rowFields[0])
  161. if err != nil {
  162. errors.AddPartial(0, err)
  163. continue
  164. }
  165. // Only report stat if value was not NULL
  166. if v.Valid {
  167. values[stat.key] = v.String
  168. }
  169. rowFields = rowFields[1:]
  170. }
  171. data = append(data, values)
  172. }
  173. return data, errors.Combine()
  174. }
  175. func convertInterfaceToString(input any) (sql.NullString, error) {
  176. if val, ok := input.(*sql.NullString); ok {
  177. return *val, nil
  178. }
  179. return sql.NullString{}, errors.New("issue converting interface into string")
  180. }