1package comms
  2
  3import (
  4	"ewdetect/config"
  5	"ewdetect/detect"
  6	"ewdetect/events"
  7	"ewdetect/stations"
  8	"ewdetect/utils"
  9	"time"
 10
 11	"github.com/GeoNet/kit/seis/ms"
 12	"github.com/GeoNet/kit/seis/sl"
 13	"github.com/rs/zerolog/log"
 14)
 15
 16var Connections = make(map[string]*Connection)
 17
 18type Connection struct {
 19	Buffers   *map[string]*[config.BufferSize]int32
 20	Heads     *map[string]int
 21	HeadLoops *map[string]int
 22	StartTime *time.Time
 23}
 24
 25// https://github.com/GeoNet/kit/blob/main/shake/intensity.go
 26func StreamThread(url string) {
 27	log.Info().Str("url", url).Msg("Connecting to stream")
 28	var buffers = make(map[string]*[config.BufferSize]int32)
 29	var heads = make(map[string]int)
 30	var headLoops = make(map[string]int)
 31	var startTime = time.Now()
 32
 33	Connections[url] = &Connection{&buffers, &heads, &headLoops, &startTime}
 34	/*conn, err := sl.NewConn(url, 5*time.Second)
 35	utils.CheckError(err)
 36	defer conn.Close()
 37
 38	info, err := conn.GetInfo("streams") // stations also possible
 39	utils.CheckError(err)
 40
 41	stations := info.Station
 42	for _, station := range stations {
 43		fmt.Println("Found", station.Name, "-", station.Description)
 44		//fmt.Println(station.Stream)
 45	}*/
 46
 47	sdl := sl.NewSLink(
 48		sl.SetServer(url),
 49		sl.SetTimeout(5*time.Second),
 50		sl.SetKeepAlive(30*time.Second),
 51		sl.SetStreams("*"),
 52		sl.SetSelectors("EHZ"), // Short Period - Identified in seismograms with the suffix "EHZ" - sensitive velocity seismometers with a response peaked around 1 Hz. Typically only a single vertical component. Primarily used for determination of locations and magnitudes of small regional earthquakes. Strong Motion - Identified in seismograms by suffixes: "ENZ" or "HNZ" (Vertical), "ENE" or "HNE" (East-West horizontal), and "ENN" or "HNN" (North-South horizontal) - accelerometers with three components. Designed to record on-scale waveforms from moderate and large regional earthquakes that give rise to strong shaking. Broadband - Identified in seismograms by suffixes: "BHZ" or "HHZ" (Vertical), "BHE" or "HHE" (East-West horizontal), or "BHN" or "HHN" (North-South horizontal - velocity seismometers with a wide frequency response. Primary purpose is to record waveforms from regional and distant earthquakes for research purposes.
 53
 54		sl.SetStart(startTime),
 55		sl.SetEnd(time.Time{}),
 56	)
 57
 58	if err := sdl.Collect(func(seq string, pkt []byte) (bool, error) {
 59		var record ms.Record
 60		err := record.Unpack(pkt)
 61		utils.CheckError(err)
 62
 63		stationName := record.Station()
 64		data, err := record.Int32s()
 65		if err != nil {
 66			log.Warn().Str("station", stationName).Msg("Got invalid data from station, ignoring")
 67		} else {
 68			if _, ok := buffers[stationName]; !ok {
 69				buffers[stationName] = &[config.BufferSize]int32{}
 70				heads[stationName] = 0
 71				headLoops[stationName] = 0
 72				log.Info().Str("station", stationName).Str("url", url).Msg("Registered new station")
 73			}
 74
 75			for i, datum := range data {
 76				buffers[stationName][(heads[stationName]+i)%config.BufferSize] = datum
 77			}
 78			heads[stationName] = (heads[stationName] + len(data)) % config.BufferSize
 79			headLoops[stationName] += (heads[stationName] + len(data)) / config.BufferSize
 80			if _, ok := stations.Stations[url]; ok {
 81				sampleRate := (*stations.Stations[url])[stationName].SampleRate
 82				if time.Since(startTime) > time.Duration((1e9*config.BufferSize)/float64(sampleRate)) {
 83					detected, pWaveArrival, sWaveArrival := detect.ThresholdDetectWaves(buffers[stationName], heads[stationName], "", false)
 84					if detected {
 85						timeOfPWaveArrival := startTime.Add(time.Duration(1e9 * float64(pWaveArrival) / float64((*stations.Stations[url])[stationName].SampleRate)))
 86						timeOfPWaveArrival = timeOfPWaveArrival.Add(time.Duration(1e9 * float64((headLoops[stationName]-1)*config.BufferSize) / float64(sampleRate)))
 87
 88						timeOfSWaveArrival := startTime.Add(time.Duration(1e9 * float64(sWaveArrival) / float64((*stations.Stations[url])[stationName].SampleRate)))
 89						timeOfSWaveArrival = timeOfSWaveArrival.Add(time.Duration(1e9 * float64((headLoops[stationName]-1)*config.BufferSize) / float64(sampleRate)))
 90
 91						if eventName, isNewEvent := events.NewDetection(url, stationName, timeOfPWaveArrival, timeOfSWaveArrival); isNewEvent {
 92							log.Info().
 93								Str("connection", url).
 94								Str("station", stationName).
 95								Int("pWaveArrival", pWaveArrival).
 96								Int("sWaveArrival", sWaveArrival).
 97								Msg("Detected wave arrival")
 98							detect.ThresholdDetectWaves(buffers[stationName], heads[stationName], eventName+"/"+url+"."+stationName, true)
 99						}
100					}
101				}
102			}
103		}
104		return false, nil
105	}); err != nil {
106		log.Error().Err(err).Msg("Error collecting stream data")
107	}
108	select {}
109}