1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
// internal/weather/utils.go
package weather
import (
"os"
"strconv"
"strings"
"time"
)
// GetEnv gets environment variable with default
func GetEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
// ParseDuration parses duration string
func ParseDuration(s string) time.Duration {
d, err := time.ParseDuration(s)
if err != nil {
return 5 * time.Minute
}
return d
}
// ParseInt parses integer string
func ParseInt(s string) int {
i, err := strconv.Atoi(s)
if err != nil {
return 0
}
return i
}
// ParseFloat parses float string
func ParseFloat(s string) float64 {
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0
}
return f
}
// ParseTopics parses comma-separated topics
func ParseTopics(s string) []string {
if s == "" {
return []string{
"weather/obs/fmi/#",
"weather/forecast/fmi/#",
}
}
return strings.Split(s, ",")
}
// LoadWatermark loads last observation time from file
func LoadWatermark(filename string) time.Time {
b, err := os.ReadFile(filename)
if err != nil {
return time.Now().UTC().Add(-24 * time.Hour)
}
sec, err := strconv.ParseInt(strings.TrimSpace(string(b)), 10, 64)
if err != nil {
return time.Now().UTC().Add(-24 * time.Hour)
}
return time.Unix(sec, 0).UTC()
}
// StoreWatermark stores observation time to file
func StoreWatermark(filename string, t time.Time) error {
data := []byte(strconv.FormatInt(t.Unix(), 10))
return os.WriteFile(filename, data, 0644)
}
// InitTopicStats initializes topic statistics
func InitTopicStats() *TopicStats {
return &TopicStats{
MessagesReceived: make(map[string]int),
LastMessageTime: make(map[string]time.Time),
}
}
|