looking for some solutions? You are welcome.

SOLVED: Apply function from external library in pyspark/pandas

Ahmad Suliman:

I have Dataframe contain two columns contain coordinates in the geographic coordinate system latitude and longitude. I need to apply a method in OSMnx library which returns the nearest node and the distance to specified points as follows:

osmnx.utils.get_nearest_node(G, point, method='haversine', return_dist=False)

source: https://osmnx.readthedocs.io/en/stable/osmnx.html?highlight=get_nearest_node#osmnx.utils.get_route_edge_attributes where G is network and driven by using one of the method that existed in this link https://geoffboeing.com/2016/11/osmnx-python-street-networks/

Where my trial is :

udf_node=fn.udf(lambda x,y:ox.get_nearest_node(G, (x,y), return_dist=True)[0],IntegerType())
udf_node_dist=fn.udf(lambda x,y:ox.get_nearest_node(G, (x,y), return_dist=True)[1],FloatType())

df = df.withColumn('node',udf_node(fn.col('longitude'), fn.col('latitude')))
df = df.withColumn('node_dist',udf_node_dist(fn.col('longitude'), fn.col('latitude')))

while calling show function or trying to save the resulting dataframe as parquet I got the following error :

Py4JJavaError: An error occurred while calling o976.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 1 times, most recent failure: Lost task 0.0 in stage 34.0 (TID 104, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)

How can I apply the external library function in pyspark?


def nearest_node_dist(x,y):
    return ox.get_nearest_node(G, (x,y), return_dist=True) #Output: (nearest_node,dist)

udf_node=fn.udf(lambda x,nearest_node_dist(x,y)[0],IntegerType())
udf_dist=fn.udf(lambda x,nearest_node_dist(x,y)[1],FloatType())

df = df.withColumn('node',udf_node(fn.col('longitude'), fn.col('latitude')))
df = df.withColumn('dist',udf_dist(fn.col('longitude'), fn.col('latitude')))

I also got the same error.

In pandas tits running well for a sample of the dataframe :

where the code as follows:

pddfsample=pddf.head() ## pddf is pandas dataframe

def nearest_node(x,y):
    nearest_node,dist=ox.get_nearest_node(G, (x,y), return_dist=True)
    return nearest_node 
def dist_to_Nnode(x,y):
    nearest_node,dist=ox.get_nearest_node(G, (x,y), return_dist=True)
    return dist 

pddf['nearest_node'] = np.vectorize(nearest_node)(pddf['longitude'],pddf['latitude'])

pddf['dist_to_Nnode'] = np.vectorize(dist_to_Nnode)(pddf['longitude'],pddf['latitude'])   

Although it running well, it took a lot of time for the whole dataframe.

Posted in S.E.F
via StackOverflow & StackExchange Atomic Web Robots

No comments: