diff --git a/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt b/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt index 57816a5..de6a912 100644 --- a/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt +++ b/app/src/main/java/it/reyboz/bustorino/backend/mato/MQTTMatoClient.kt @@ -1,495 +1,490 @@ package it.reyboz.bustorino.backend.mato import android.app.NotificationManager import android.content.Context import android.util.Log import androidx.lifecycle.LifecycleOwner import com.hivemq.client.mqtt.MqttClient import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnect import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish import it.reyboz.bustorino.BuildConfig import it.reyboz.bustorino.backend.LivePositionsServiceStatus import it.reyboz.bustorino.backend.gtfs.LivePositionUpdate import org.json.JSONArray import org.json.JSONException import java.lang.ref.WeakReference import java.util.* import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import kotlin.concurrent.withLock typealias PositionsMap = HashMap > class MQTTMatoClient(){ private var isStarted = false //private var subscribedToAll = false private var client: Mqtt3AsyncClient? = null //private var clientID = "" private val respondersMap = HashMap>>() private val currentPositions = PositionsMap() private val positionsLock = ReentrantLock() private lateinit var lifecycle: LifecycleOwner //TODO: remove class reference to context (always require context in all methods) private var context: Context?= null //private var connectionTrials = 0 //private var notification: Notification? = null private fun connect(context: Context, onConnect: OnConnect){ //val clientID = "mqtt-explorer-${getRandomString(8)}"//"mqttjs_${getRandomString(8)}" if (this.context ==null) this.context = context.applicationContext //notification = Notifications.makeMQTTServiceNotification(context) val mclient = makeClientBuilder().buildAsync() Log.d(DEBUG_TAG, "Connecting to MQTT server") //actually connect val mess = mclient.connect().whenComplete { subAck, throwable -> if(throwable!=null){ Log.w(DEBUG_TAG,"Failed to connect to MQTT server:") Log.w(DEBUG_TAG,throwable.toString()) } else{ isStarted = true Log.d(DEBUG_TAG, "Connected to MQTT server") onConnect.onConnectSuccess() } } client = mclient /* if (mess.returnCode != Mqtt3ConnAckReturnCode.SUCCESS){ Log.w(DEBUG_TAG,"Failed to connect to MQTT client") return false } else{ client = blockingClient.toAsync() isStarted = true return true } */ } /*override fun connectComplete(reconnect: Boolean, serverURI: String?) { Log.d(DEBUG_TAG, "Connected to server, reconnect: $reconnect") Log.d(DEBUG_TAG, "Have listeners: $respondersMap") } */ private fun subscribeMQTTTopic(topic: String){ if(context==null){ Log.e(DEBUG_TAG, "Trying to connect but context is null") return } client!!.subscribeWith() .topicFilter(topic).callback {publish -> messageArrived(publish.topic.toString(), publish) }.send().whenComplete { subAck, throwable-> if(throwable != null){ Log.e(DEBUG_TAG, "Error while subscribing to topic $topic", throwable) } else{ //add the responder to the list Log.d(DEBUG_TAG, "Subscribed to topic $topic, responders ready: ${respondersMap[topic]}") } } } private fun subscribeTopicAddResponder(topic: String, responderWR: WeakReference, lineId: String){ if (!respondersMap.contains(lineId)){ respondersMap[lineId] = ArrayList() } respondersMap[lineId]!!.add(responderWR) subscribeMQTTTopic(topic) } fun startAndSubscribe(lineId: String, responder: MQTTMatoListener, context: Context): Boolean{ //start the client, and then subscribe to the topic val topic = mapTopic(lineId) val vrResp = WeakReference(responder) this.context = context.applicationContext synchronized(this) { if(!isStarted || (client == null)){ connect(context.applicationContext) { //when connection is done, run this subscribeTopicAddResponder(topic, vrResp, lineId) } //wait for connection } else { subscribeTopicAddResponder(topic, vrResp, lineId) } //recheck if it is started } return true } fun stopMatoRequests(responder: MQTTMatoListener){ var removed = false + + for ((lineTopic,responderList)in respondersMap.entries){ - var done = false - for (el in responderList){ - if (el.get()==null){ - responderList.remove(el) - } else if(el.get() == responder){ - responderList.remove(el) - done = true - } - if (done) - break + val oldSize = responderList.size + + responderList.removeIf { + (it.get()==null) || (it.get()==responder) } - if(done) Log.d(DEBUG_TAG, "Removed one listener for topic $lineTopic, listeners: $responderList") + val diffLength = responderList.size - oldSize + if(diffLength > 0) + Log.d(DEBUG_TAG, "Removed $diffLength listeners for topic $lineTopic, listeners: $responderList") //if (done) break if (responderList.isEmpty()){ //actually unsubscribe try { val topic = mapTopic(lineTopic) client?.run{ unsubscribeWith().addTopicFilter(topic).send().whenComplete { subAck, throwable -> if (throwable!=null){ //error occurred Log.e(DEBUG_TAG, "Error while unsubscribing to topic $topic",throwable) } } } } catch (e: Exception){ Log.e(DEBUG_TAG, "Tried unsubscribing but there was an error in the client library:\n$e") } } - removed = done || removed + removed = (diffLength>0) || removed } // check responders map, remove lines that have no responders - - for(line in respondersMap.keys){ - if(respondersMap[line]?.isEmpty() == true){ - respondersMap.remove(line) - } + respondersMap.entries.removeIf { + it.value.isEmpty() } Log.d(DEBUG_TAG, "Removed: $removed, respondersMap: $respondersMap") } /** * Cancel the notification */ fun removeNotification(context: Context){ val notifManager = context.applicationContext.getSystemService(Context.NOTIFICATION_SERVICE) as NotificationManager notifManager.cancel(MQTT_NOTIFICATION_ID) } private fun sendUpdateToResponders(responders: ArrayList>): Int{ //var sent = false var count = 0 for (wrD in responders) { if (wrD.get() == null) { Log.d(DEBUG_TAG, "Removing weak reference") responders.remove(wrD) } else { val resp = wrD.get()!! resp.onStatusUpdate(LivePositionsServiceStatus.OK) positionsLock.withLock { resp.onUpdateReceived(currentPositions) } //sent = true count++ } } return count } private fun sendStatusToResponders(status: LivePositionsServiceStatus){ val responders = respondersMap for (els in respondersMap.values) for (wrD in els) { if (wrD.get() == null) { Log.d(DEBUG_TAG, "Removing weak reference") els.remove(wrD) } else { wrD.get()!!.onStatusUpdate(status) //sent = true } } } /*override fun connectionLost(cause: Throwable?) { var doReconnect = false for ((line,elms) in respondersMap.entries){ if(!elms.isEmpty()){ doReconnect = true break } } if (!doReconnect){ Log.d(DEBUG_TAG, "Disconnected, but no responders to give the positions, avoid reconnecting") //finish here return } Log.w(DEBUG_TAG, "Lost connection in MQTT Mato Client") synchronized(this){ // isStarted = false //var i = 0 // while(i < 20 && !isStarted) { connect(context!!, object: IMqttActionListener{ override fun onSuccess(asyncActionToken: IMqttToken?) { //relisten to messages for ((line,elms) in respondersMap.entries){ val topic = mapTopic(line) if(elms.isEmpty()) respondersMap.remove(line) else { client!!.subscribe(topic, QoS.AtMostOnce.value, null, null) Log.d(DEBUG_TAG, "Resubscribed with topic $topic") } } Log.d(DEBUG_TAG, "Reconnected to MQTT Mato Client") } override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { Log.w(DEBUG_TAG, "Failed to reconnect to MQTT server") } }) } } */ fun messageArrived(topic: String?, message: Mqtt3Publish) { if (topic==null) return //Log.d(DEBUG_TAG,"Arrived message on topic $topic, ${String(message.payload)}") parseMessageAndAddToList(topic, message) //GlobalScope.launch { } } private fun parseMessageAndAddToList(topic: String, message: Mqtt3Publish){ //val mqttTopic = message.topic //Log.d(DEBUG_TAG, "Topic of message received: $mqttTopic") val vals = topic.split("/") val lineId = vals[1] val vehicleId = vals[2] val timestamp = makeUnixTimestamp() val messString = String(message.payloadAsBytes) if(BuildConfig.DEBUG) Log.d(DEBUG_TAG, "Received message on topic: $topic") try { val jsonList = JSONArray(messString) if(jsonList.get(4)==null){ Log.d(DEBUG_TAG, "We have null tripId: line $lineId veh $vehicleId: $jsonList") sendStatusToResponders(LivePositionsServiceStatus.NO_POSITIONS) return } val posUpdate = LivePositionUpdate( jsonList.getString(4)+"U", null, null, lineId+"U", vehicleId, jsonList.getDouble(0), //latitude jsonList.getDouble(1), //longitude if(jsonList.get(2).equals(null)) null else jsonList.getInt(2).toFloat(), //"heading" (same as bearing?) timestamp, if(jsonList.get(6).equals(null)) null else jsonList.getInt(6).toString() //nextStop ) //add update //var valid = false positionsLock.withLock { if (!currentPositions.contains(lineId)) currentPositions[lineId] = HashMap() currentPositions[lineId]!!.let { it[vehicleId] = posUpdate //valid = true } } //sending //Log.d(DEBUG_TAG, "Parsed update on topic $topic, line $lineId, responders $respondersMap") var cc = 0 if (LINES_ALL in respondersMap.keys) { val count = sendUpdateToResponders(respondersMap[LINES_ALL]!!) cc +=count } if(lineId in respondersMap.keys){ cc += sendUpdateToResponders(respondersMap[lineId]!!) } //Log.d(DEBUG_TAG, "Sent to $cc responders, have $respondersMap") if(cc==0){ Log.w(DEBUG_TAG, "We have received an update but apparently there is no one to send it") var emptyResp = true for(en in respondersMap.values){ if(!en.isEmpty()){ emptyResp=false break } } //try unsubscribing to all if(emptyResp) { Log.d(DEBUG_TAG, "Unsubscribe all") //client!!.unsubscribe(LINES_ALL) } } //Log.d(DEBUG_TAG, "We have update on line $lineId, vehicle $vehicleId") } catch (e: JSONException){ Log.w(DEBUG_TAG,"Cannot decipher message on topic $topic, line $lineId, veh $vehicleId (bad JSON)") sendStatusToResponders(LivePositionsServiceStatus.ERROR_PARSING_RESPONSE) } catch (e: Exception){ Log.e(DEBUG_TAG, "Exception occurred", e) sendStatusToResponders(LivePositionsServiceStatus.ERROR_PARSING_RESPONSE) } } /** * Remove positions older than `timeMins` minutes */ fun clearOldPositions(timeMins: Int){ val currentTimeStamp = makeUnixTimestamp() var c = 0 positionsLock.withLock{ for ((k, posByVeh) in currentPositions) { /*for (t in posByVeh.keys.toList()) { // iterate over snapshot to avoid modification error val p = posByVeh[t] ?: continue if (currentTimeStamp - p.timestamp > timeMins * 60) { posByVeh.remove(t) c += 1 } } */ c+=posByVeh.entries.removeIf{ el -> currentTimeStamp - el.value.timestamp > timeMins * 60 }.let { if(it) 1 else 0 } } } Log.d(DEBUG_TAG, "Removed $c positions older than $timeMins minutes") } /*/** * Stop the service forever. Client has not to be used again!! */ fun closeClientForever(){ client.disconnect() client.close() }*/ fun disconnect(){ client?.disconnect() } companion object{ const val SERVER_ADDR="mapi.5t.torino.it" const val SERVER_PATH="/scre" const val LINES_ALL="ALL" private const val DEBUG_TAG="BusTO-MatoMQTT" //this has to match the value in MQTT library (MQTTAndroidClient) const val MQTT_NOTIFICATION_ID: Int = 77 @JvmStatic fun mapTopic(lineId: String): String{ return if(lineId== LINES_ALL || lineId == "#") "#" else{ "/${lineId}/#" } } fun getRandomString(length: Int) : String { val allowedChars = ('a'..'f') + ('0'..'9') return (1..length) .map { allowedChars.random() } .joinToString("") } interface MQTTMatoListener{ //positionsMap is a dict with line -> vehicle -> Update fun onUpdateReceived(posUpdates: PositionsMap) fun onStatusUpdate(status: LivePositionsServiceStatus) } fun getHTTPHeaders(): HashMap { val headers = HashMap() headers["Origin"] = "https://mato.muoversiatorino.it" headers["Host"] = "mapi.5t.torino.it" return headers } private fun makeClientBuilder() : Mqtt3ClientBuilder{ val clientID = "mqtt-explorer-${getRandomString(8)}" val r = MqttClient.builder() .useMqttVersion3() .identifier(clientID) .serverHost(SERVER_ADDR) .serverPort(443) .sslWithDefaultConfig() .automaticReconnect(MqttClientAutoReconnect.builder() .initialDelay(500, TimeUnit.MILLISECONDS) .maxDelay(60, TimeUnit.SECONDS).build()) .webSocketConfig() .httpHeaders(getHTTPHeaders()) .serverPath("scre") .applyWebSocketConfig() //.webSocketWithDefaultConfig() return r } private fun makeUnixTimestamp(): Long{ val timestamp = (System.currentTimeMillis() / 1000 ) return timestamp } } private fun interface OnConnect{ fun onConnectSuccess() } } /*data class MQTTPositionUpdate( val lineId: String, val vehicleId: String, val latitude: Double, val longitude: Double, val heading: Int?, val speed: Int?, val tripId: String?, val direct: Int?, val nextStop: Int?, //val full: Int? )*/ \ No newline at end of file diff --git a/app/src/main/java/it/reyboz/bustorino/map/MapLibreUtils.kt b/app/src/main/java/it/reyboz/bustorino/map/MapLibreUtils.kt index 72f7e71..4eba930 100644 --- a/app/src/main/java/it/reyboz/bustorino/map/MapLibreUtils.kt +++ b/app/src/main/java/it/reyboz/bustorino/map/MapLibreUtils.kt @@ -1,231 +1,234 @@ package it.reyboz.bustorino.map import android.animation.TypeEvaluator import android.content.Context import android.util.Log import it.reyboz.bustorino.BuildConfig import it.reyboz.bustorino.backend.Stop import org.maplibre.android.geometry.LatLng import org.maplibre.android.location.LocationComponentActivationOptions import org.maplibre.android.location.LocationComponentOptions import org.maplibre.android.location.engine.LocationEngineRequest import org.maplibre.android.maps.Style import org.maplibre.geojson.Point import org.maplibre.turf.TurfMeasurement class MapLibreUtils { companion object{ const val STYLE_BRIGHT_DEFAULT_JSON = "map_style_good_noshops.json" const val STYLE_VERSATILES_COLORFUL_JSON = "versatiles_colorful_light.json" const val STYLE_OSM_RASTER="openstreetmap_raster.json" private const val DEBUG_TAG ="BusTO-MapLibreUtils" @JvmStatic fun getDefaultStyleJson() = STYLE_VERSATILES_COLORFUL_JSON @JvmStatic fun shortestRotation(from: Float, to: Float): Float { var delta = (to - from) % 360 if (delta > 180) delta -= 360 if (delta < -180) delta += 360 return from + delta } @JvmStatic fun buildLocationComponentActivationOptions( style: Style, locationComponentOptions: LocationComponentOptions, context: Context ): LocationComponentActivationOptions { return LocationComponentActivationOptions .builder(context, style) .locationComponentOptions(locationComponentOptions) .useDefaultLocationEngine(true) .locationEngineRequest( LocationEngineRequest.Builder(750) .setFastestInterval(750) .setPriority(LocationEngineRequest.PRIORITY_HIGH_ACCURACY) .build() ) .build() } @JvmStatic fun calcDistanceInSegment(points: List, from: Int, to:Int): Double{ var d=0.0 var prev = points[from] for(i in from+1..to){ d += prev.distanceTo(points[i]) prev = points[i] } return d } @JvmStatic fun findIndexMidPoint(points: List, from: Int, to: Int, distThresh:Double): Int{ var totdist=0.0 var idx = -2 var prev = points[from] for(i in from+1..to){ totdist += prev.distanceTo(points[i]) prev = points[i] if (totdist >= distThresh){ idx = i break } } if(idx==-2) throw Error("Distance out of bounds, total distance is $totdist") return idx } @JvmStatic fun findPointsToPutDirectionMarkers(polyPoints: List, stops: List, distanceIcon: Double): List{ + //output value + val pointsOutput = mutableListOf() val closestIndices = findIndicesClosestPointsForStops(polyPoints, stops) Log.d(DEBUG_TAG, "idcs: $closestIndices") + if(closestIndices.size==0) + return pointsOutput val distancesSec = mutableListOf() var pi = closestIndices[0] val cumulativeDist = mutableListOf() var sum = 0.0 - val pointsOutput = mutableListOf() var nPoints = 0 var distFromLastPoint = 0.0 for(i in 1..= distanceIcon){ if(BuildConfig.DEBUG) Log.d(DEBUG_TAG, "Add between stop ${stops[i-1]} and stop ${stops[i]}, distance between: $dd") if(dd>100) { val imid = findIndexMidPoint(polyPoints, pi, newi, dd / 2) pointsOutput.add(imid) nPoints += 1 distFromLastPoint=0.0 } } else{ //add the last distance //distFromLastPoint+=dd/2 } pi= newi } return pointsOutput } /* VERSION WITH TOTAL DISTANCE val distancesSec = mutableListOf() var prevk = 0 val cumulativeDist = mutableListOf() var sum = 0.0 val pointsOutput = mutableListOf() var nPoints = 0 //for(i in 1..= distanceIcon ){ //Log.d(DEBUG_TAG, "Add between stop ${stops[lastStopidx]} and stop ${stops[lastStopidx+1]}") //find closest stops var stopDist = Math.min(polyPoints[prevk].distanceTo(polyPoints[lastStopidx]),polyPoints[k].distanceTo(polyPoints[lastStopidx])) if(lastStopidx+1 < stops.size){ stopDist = Math.min(stopDist, Math.min(polyPoints[prevk].distanceTo(polyPoints[lastStopidx+1]),polyPoints[k].distanceTo(polyPoints[lastStopidx+1])) ) } if(stopDist>100) { val imid = findIndexMidPoint(polyPoints, prevk, k, dd / 2) pointsOutput.add(imid) nPoints += 1 distFromLast = 0.0 } } prevk = k if (k>closestIndices[lastStopidx]) lastStopidx +=1 } return pointsOutput */ @JvmStatic fun splitPolyWhenDistanceTooBig(points: List, distMax: Double): List{ val outList = mutableListOf(points[0]) var oldP = points[0] for(i in 1.. distMax){ val newLat = (oldP.latitude+newP.latitude)/2 val newLong = (oldP.longitude+newP.longitude)/2 val extraP = LatLng(newLat,newLong) outList.add(extraP) } outList.add(newP) oldP=newP } return outList } @JvmStatic fun findIndicesClosestPointsForStops(points:List, stops:List): List { val closestIndices = stops.map { s-> val p = LatLng(s.latitude!!, s.longitude!!) var dist = 10_000_000.0 // in meters var id = -1 for (j in points.indices){ val newd = p.distanceTo(points[j]) if (newd { private val latLng = LatLng() override fun evaluate(fraction: Float, startValue: LatLng, endValue: LatLng): LatLng { latLng.latitude = startValue.latitude + (endValue.latitude - startValue.latitude) * fraction latLng.longitude = startValue.longitude + (endValue.longitude - startValue.longitude) * fraction return latLng } } } \ No newline at end of file