1package comms
2
3import (
4 "ewdetect/config"
5 "ewdetect/detect"
6 "ewdetect/events"
7 "ewdetect/stations"
8 "ewdetect/utils"
9 "time"
10
11 "github.com/GeoNet/kit/seis/ms"
12 "github.com/GeoNet/kit/seis/sl"
13 "github.com/rs/zerolog/log"
14)
15
16var Connections = make(map[string]*Connection)
17
18type Connection struct {
19 Buffers *map[string]*[config.BufferSize]int32
20 Heads *map[string]int
21 HeadLoops *map[string]int
22 StartTime *time.Time
23}
24
25// https://github.com/GeoNet/kit/blob/main/shake/intensity.go
26func StreamThread(url string) {
27 log.Info().Str("url", url).Msg("Connecting to stream")
28 var buffers = make(map[string]*[config.BufferSize]int32)
29 var heads = make(map[string]int)
30 var headLoops = make(map[string]int)
31 var startTime = time.Now()
32
33 Connections[url] = &Connection{&buffers, &heads, &headLoops, &startTime}
34 /*conn, err := sl.NewConn(url, 5*time.Second)
35 utils.CheckError(err)
36 defer conn.Close()
37
38 info, err := conn.GetInfo("streams") // stations also possible
39 utils.CheckError(err)
40
41 stations := info.Station
42 for _, station := range stations {
43 fmt.Println("Found", station.Name, "-", station.Description)
44 //fmt.Println(station.Stream)
45 }*/
46
47 sdl := sl.NewSLink(
48 sl.SetServer(url),
49 sl.SetTimeout(5*time.Second),
50 sl.SetKeepAlive(30*time.Second),
51 sl.SetStreams("*"),
52 sl.SetSelectors("EHZ"), // Short Period - Identified in seismograms with the suffix "EHZ" - sensitive velocity seismometers with a response peaked around 1 Hz. Typically only a single vertical component. Primarily used for determination of locations and magnitudes of small regional earthquakes. Strong Motion - Identified in seismograms by suffixes: "ENZ" or "HNZ" (Vertical), "ENE" or "HNE" (East-West horizontal), and "ENN" or "HNN" (North-South horizontal) - accelerometers with three components. Designed to record on-scale waveforms from moderate and large regional earthquakes that give rise to strong shaking. Broadband - Identified in seismograms by suffixes: "BHZ" or "HHZ" (Vertical), "BHE" or "HHE" (East-West horizontal), or "BHN" or "HHN" (North-South horizontal - velocity seismometers with a wide frequency response. Primary purpose is to record waveforms from regional and distant earthquakes for research purposes.
53
54 sl.SetStart(startTime),
55 sl.SetEnd(time.Time{}),
56 )
57
58 if err := sdl.Collect(func(seq string, pkt []byte) (bool, error) {
59 var record ms.Record
60 err := record.Unpack(pkt)
61 utils.CheckError(err)
62
63 stationName := record.Station()
64 data, err := record.Int32s()
65 if err != nil {
66 log.Warn().Str("station", stationName).Msg("Got invalid data from station, ignoring")
67 } else {
68 if _, ok := buffers[stationName]; !ok {
69 buffers[stationName] = &[config.BufferSize]int32{}
70 heads[stationName] = 0
71 headLoops[stationName] = 0
72 log.Info().Str("station", stationName).Str("url", url).Msg("Registered new station")
73 }
74
75 for i, datum := range data {
76 buffers[stationName][(heads[stationName]+i)%config.BufferSize] = datum
77 }
78 heads[stationName] = (heads[stationName] + len(data)) % config.BufferSize
79 headLoops[stationName] += (heads[stationName] + len(data)) / config.BufferSize
80 if _, ok := stations.Stations[url]; ok {
81 sampleRate := (*stations.Stations[url])[stationName].SampleRate
82 if time.Since(startTime) > time.Duration((1e9*config.BufferSize)/float64(sampleRate)) {
83 detected, pWaveArrival, sWaveArrival := detect.ThresholdDetectWaves(buffers[stationName], heads[stationName], "", false)
84 if detected {
85 timeOfPWaveArrival := startTime.Add(time.Duration(1e9 * float64(pWaveArrival) / float64((*stations.Stations[url])[stationName].SampleRate)))
86 timeOfPWaveArrival = timeOfPWaveArrival.Add(time.Duration(1e9 * float64((headLoops[stationName]-1)*config.BufferSize) / float64(sampleRate)))
87
88 timeOfSWaveArrival := startTime.Add(time.Duration(1e9 * float64(sWaveArrival) / float64((*stations.Stations[url])[stationName].SampleRate)))
89 timeOfSWaveArrival = timeOfSWaveArrival.Add(time.Duration(1e9 * float64((headLoops[stationName]-1)*config.BufferSize) / float64(sampleRate)))
90
91 if eventName, isNewEvent := events.NewDetection(url, stationName, timeOfPWaveArrival, timeOfSWaveArrival); isNewEvent {
92 log.Info().
93 Str("connection", url).
94 Str("station", stationName).
95 Int("pWaveArrival", pWaveArrival).
96 Int("sWaveArrival", sWaveArrival).
97 Msg("Detected wave arrival")
98 detect.ThresholdDetectWaves(buffers[stationName], heads[stationName], eventName+"/"+url+"."+stationName, true)
99 }
100 }
101 }
102 }
103 }
104 return false, nil
105 }); err != nil {
106 log.Error().Err(err).Msg("Error collecting stream data")
107 }
108 select {}
109}