123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503 |
- // Copyright The OpenTelemetry Authors
- // SPDX-License-Identifier: Apache-2.0
- package apachesparkreceiver
- import (
- "encoding/json"
- "net/http"
- "net/http/httptest"
- "os"
- "path/filepath"
- "strings"
- "testing"
- "github.com/stretchr/testify/require"
- "go.opentelemetry.io/collector/component"
- "go.opentelemetry.io/collector/component/componenttest"
- "go.opentelemetry.io/collector/config/confighttp"
- "go.uber.org/zap"
- "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/apachesparkreceiver/internal/models"
- )
- const (
- testURL = "http://sparkmock.com"
- )
- func TestNewApacheSparkClient(t *testing.T) {
- testCases := []struct {
- desc string
- cfg *Config
- host component.Host
- settings component.TelemetrySettings
- logger *zap.Logger
- expectError error
- }{
- {
- desc: "Valid Configuration",
- cfg: &Config{
- HTTPClientSettings: confighttp.HTTPClientSettings{
- Endpoint: defaultEndpoint,
- },
- },
- host: componenttest.NewNopHost(),
- settings: componenttest.NewNopTelemetrySettings(),
- logger: zap.NewNop(),
- expectError: nil,
- },
- }
- for _, tc := range testCases {
- t.Run(tc.desc, func(t *testing.T) {
- ac, err := newApacheSparkClient(tc.cfg, tc.host, tc.settings)
- if tc.expectError != nil {
- require.Nil(t, ac)
- require.Contains(t, err.Error(), tc.expectError.Error())
- } else {
- require.NoError(t, err)
- actualClient, ok := ac.(*apacheSparkClient)
- require.True(t, ok)
- require.Equal(t, tc.logger, actualClient.logger)
- require.NotNil(t, actualClient.client)
- }
- })
- }
- }
- func TestClusterStats(t *testing.T) {
- testCases := []struct {
- desc string
- testFunc func(*testing.T)
- }{
- {
- desc: "Returns an error on Get() failure",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, clusterStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.HasSuffix(r.RequestURI, "metrics/json") {
- w.WriteHeader(http.StatusUnauthorized)
- } else {
- _, err := w.Write(data)
- require.NoError(t, err)
- }
- }))
- defer ts.Close()
- tc := createTestClient(t, testURL)
- clusterStats, err := tc.ClusterStats()
- require.NotNil(t, err)
- require.Nil(t, clusterStats)
- },
- },
- {
- desc: "Handles case where result cannot be unmarshalled",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, clusterStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "/metrics/json") {
- _, err = w.Write([]byte("[{}]"))
- require.NoError(t, err)
- } else {
- _, err = w.Write(data)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- clusterStats, err := tc.ClusterStats()
- require.Nil(t, clusterStats)
- require.NotNil(t, err)
- },
- },
- {
- desc: "Success case",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, clusterStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "/metrics/json") {
- _, err = w.Write(data)
- require.NoError(t, err)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- var expected *models.ClusterProperties
- data = loadAPIResponseData(t, clusterStatsResponseFile)
- err := json.Unmarshal(data, &expected)
- require.NoError(t, err)
- clusterStats, err := tc.ClusterStats()
- require.NoError(t, err)
- require.NotNil(t, clusterStats)
- require.Equal(t, expected, clusterStats)
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.desc, tc.testFunc)
- }
- }
- func TestApplications(t *testing.T) {
- testCases := []struct {
- desc string
- testFunc func(*testing.T)
- }{
- {
- desc: "Returns an error on Get() failure",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, appsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.HasSuffix(r.RequestURI, "applications") {
- w.WriteHeader(http.StatusUnauthorized)
- } else {
- _, err := w.Write(data)
- require.NoError(t, err)
- }
- }))
- defer ts.Close()
- tc := createTestClient(t, testURL)
- apps, err := tc.Applications()
- require.NotNil(t, err)
- require.Nil(t, apps)
- },
- },
- {
- desc: "Handles case where result cannot be unmarshalled",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, appsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "applications") {
- _, err = w.Write([]byte(""))
- require.NoError(t, err)
- } else {
- _, err = w.Write(data)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- apps, err := tc.Applications()
- require.Nil(t, apps)
- require.NotNil(t, err)
- },
- },
- {
- desc: "Success case",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, appsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "applications") {
- _, err = w.Write(data)
- require.NoError(t, err)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- var expected []models.Application
- data = loadAPIResponseData(t, appsStatsResponseFile)
- err := json.Unmarshal(data, &expected)
- require.NoError(t, err)
- apps, err := tc.Applications()
- require.NoError(t, err)
- require.NotNil(t, apps)
- require.Equal(t, expected, apps)
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.desc, tc.testFunc)
- }
- }
- func TestStageStats(t *testing.T) {
- testCases := []struct {
- desc string
- testFunc func(*testing.T)
- }{
- {
- desc: "Returns an error on Get() failure",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, stagesStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.HasSuffix(r.RequestURI, "stages") {
- w.WriteHeader(http.StatusUnauthorized)
- } else {
- _, err := w.Write(data)
- require.NoError(t, err)
- }
- }))
- defer ts.Close()
- tc := createTestClient(t, testURL)
- stageStats, err := tc.StageStats("some_app_id")
- require.NotNil(t, err)
- require.Nil(t, stageStats)
- },
- },
- {
- desc: "Handles case where result cannot be unmarshalled",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, stagesStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "stages") {
- _, err = w.Write([]byte(""))
- require.NoError(t, err)
- } else {
- _, err = w.Write(data)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- stageStats, err := tc.StageStats("some_app_id")
- require.Nil(t, stageStats)
- require.NotNil(t, err)
- },
- },
- {
- desc: "Success case",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, stagesStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "stages") {
- _, err = w.Write(data)
- require.NoError(t, err)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- var expected []models.Stage
- data = loadAPIResponseData(t, stagesStatsResponseFile)
- err := json.Unmarshal(data, &expected)
- require.NoError(t, err)
- stageStats, err := tc.StageStats("some_app_id")
- require.NoError(t, err)
- require.NotNil(t, stageStats)
- require.Equal(t, expected, stageStats)
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.desc, tc.testFunc)
- }
- }
- func TestExecutorStats(t *testing.T) {
- testCases := []struct {
- desc string
- testFunc func(*testing.T)
- }{
- {
- desc: "Returns an error on Get() failure",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, executorsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.HasSuffix(r.RequestURI, "executors") {
- w.WriteHeader(http.StatusUnauthorized)
- } else {
- _, err := w.Write(data)
- require.NoError(t, err)
- }
- }))
- defer ts.Close()
- tc := createTestClient(t, testURL)
- executorStats, err := tc.ExecutorStats("some_app_id")
- require.NotNil(t, err)
- require.Nil(t, executorStats)
- },
- },
- {
- desc: "Handles case where result cannot be unmarshalled",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, executorsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "executors") {
- _, err = w.Write([]byte(""))
- require.NoError(t, err)
- } else {
- _, err = w.Write(data)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- executorStats, err := tc.ExecutorStats("some_app_id")
- require.Nil(t, executorStats)
- require.NotNil(t, err)
- },
- },
- {
- desc: "Success case",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, executorsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "executors") {
- _, err = w.Write(data)
- require.NoError(t, err)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- var expected []models.Executor
- data = loadAPIResponseData(t, executorsStatsResponseFile)
- err := json.Unmarshal(data, &expected)
- require.NoError(t, err)
- executorStats, err := tc.ExecutorStats("some_app_id")
- require.NoError(t, err)
- require.NotNil(t, executorStats)
- require.Equal(t, expected, executorStats)
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.desc, tc.testFunc)
- }
- }
- func TestJobStats(t *testing.T) {
- testCases := []struct {
- desc string
- testFunc func(*testing.T)
- }{
- {
- desc: "Returns an error on Get() failure",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, jobsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- if strings.HasSuffix(r.RequestURI, "jobs") {
- w.WriteHeader(http.StatusUnauthorized)
- } else {
- _, err := w.Write(data)
- require.NoError(t, err)
- }
- }))
- defer ts.Close()
- tc := createTestClient(t, testURL)
- jobStats, err := tc.JobStats("some_app_id")
- require.NotNil(t, err)
- require.Nil(t, jobStats)
- },
- },
- {
- desc: "Handles case where result cannot be unmarshalled",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, jobsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "jobs") {
- _, err = w.Write([]byte(""))
- require.NoError(t, err)
- } else {
- _, err = w.Write(data)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- jobStats, err := tc.JobStats("some_app_id")
- require.Nil(t, jobStats)
- require.NotNil(t, err)
- },
- },
- {
- desc: "Success case",
- testFunc: func(t *testing.T) {
- data := loadAPIResponseData(t, jobsStatsResponseFile)
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- var err error
- if strings.HasSuffix(r.RequestURI, "jobs") {
- _, err = w.Write(data)
- require.NoError(t, err)
- }
- require.NoError(t, err)
- }))
- defer ts.Close()
- tc := createTestClient(t, ts.URL)
- var expected []models.Job
- data = loadAPIResponseData(t, jobsStatsResponseFile)
- err := json.Unmarshal(data, &expected)
- require.NoError(t, err)
- jobStats, err := tc.JobStats("some_app_id")
- require.NoError(t, err)
- require.NotNil(t, jobStats)
- require.Equal(t, expected, jobStats)
- },
- },
- }
- for _, tc := range testCases {
- t.Run(tc.desc, tc.testFunc)
- }
- }
- func createTestClient(t *testing.T, baseEndpoint string) client {
- t.Helper()
- cfg := createDefaultConfig().(*Config)
- cfg.Endpoint = baseEndpoint
- testClient, err := newApacheSparkClient(cfg, componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
- require.NoError(t, err)
- return testClient
- }
- func loadAPIResponseData(t *testing.T, fileName string) []byte {
- t.Helper()
- fullPath := filepath.Join("testdata", "apiresponses", fileName)
- data, err := os.ReadFile(fullPath)
- require.NoError(t, err)
- return data
- }
|