Categories: java, apache-spark, dataset

Calculate a value in spark Dataset depending of previous version with Java Spark

1 answer

I have this dataset :

ID      timestamp   value unique1 1584420000  120 unique1 1584410000  100 unique1 1584400000  20 unique2 1584410000  90 unique2 1584400000  10 unique3 1584400000  30 

i need to calculate the value for an id and version depending of the previous version of the same id. If an ID doesn't have a last version before, the value is kept the same

ID      timestamp   valueCalculated unique1 1584420000  20 unique1 1584410000  80 unique1 1584400000  20 unique2 1584410000  80 unique2 1584400000  10 unique3 1584400000  30 

I have tried to achieve this but i am only able to aggregate by id and version and do a minus of the last two version avaible ( if an ID hasn't been updated it will keep its value ) which gives only one row per ID.

ID      timestamp   valueCalculated unique1 1584420000  20 unique2 1584410000  80 unique3 1584400000  30 

This my code :

dataset.groupBy("id","timestamp") .agg( max("timestamp").as("timestamp"), functionscallUDF("CalculateValue",first("timestamp"),first("value"),last("timestamp"),last("value") ).as("valueCalculated") 

i have used an UDF4 to calculate the value expected :

sparksession.udf().register("CalculatValue", (UDF4<Long,Double,Long,Double,Double>) this::calculateValue , DataTypes.DoubleType);  public Double calculateValue(Long Version1, Double Value1,Long Version2, Double Value2){ if(version1.equals(version2)){ return value1; }else{ return value1 - value2; } } 

I don't think i am using the good approach here becaure of the aggregation. Could you please help on to achieve this ? Thanks

All answers to this question, which has the identifier 60722051

The best answer:

I don't understand what is the version, but you can calculate the difference of the value between the current row and the previous row in this way:

import org.apache.spark.sql.expressions.Window  val w = Window.partitionBy("ID").orderBy("timestamp") df.withColumn("previousValue", lag($"value", 1, 0).over(w))   .withColumn("valueCalculated", $"value" - $"previousValue")   .orderBy("ID", "timestamp")   .show(false) 

which will give you the result as follows:

+-------+----------+-----+-------------+---------------+ |ID     |timestamp |value|previousValue|valueCalculated| +-------+----------+-----+-------------+---------------+ |unique1|1584400000|20   |0            |20             | |unique1|1584410000|100  |20           |80             | |unique1|1584420000|120  |100          |20             | |unique2|1584400000|10   |0            |10             | |unique2|1584410000|90   |10           |80             | |unique3|1584400000|30   |0            |30             | +-------+----------+-----+-------------+---------------+ 

Last questions

how do i remove the switch on my home screen?
how to edit the JS date and time to update atuomatically?
How to utilize data stored in a multidimensional array
Powermockito not mocking URL constructor in URI.toURL() method
Android Bluetooth LE Scanner only scans when phone's Location is turned on in some devices
docker wordpress container can't connect to mysql container
How can I declare a number in java that is more than 64-bits? [duplicate]
Optaplanner solutionClass entityCollectionProperty should never return null error when simple JSON object passed to controller
Anylogic, get the time a pedestrain is in a queue
How do I fix this syntax issue with my .flex file?
Optimizing query in PHP
How to find the highest number of a column and print two columns of that row in R?
Ideas on “Error: Type com.google.firebase.iid.zzav is referenced as an interface from com.google.firebase.messaging.zzd”?
JCIFS SmbFile.exists() and SmbFile.isDirectory() return false when it exists and I can listFiles()
PHP total order
Laravel booking system design
neural net - undefined column selected
How to indicate y axis does not start from 0 in ggplot?
Fragments in backStack
Spinner how to change the data