I have Dataframe contain two columns contain coordinates in the geographic coordinate system latitude and longitude. I need to apply a method in OSMnx library which returns the nearest node and the distance to specified points as follows:

```
osmnx.utils.get_nearest_node(G, point, method='haversine', return_dist=False)
```

source: https://osmnx.readthedocs.io/en/stable/osmnx.html?highlight=get_nearest_node#osmnx.utils.get_route_edge_attributes where G is network and driven by using one of the method that existed in this link https://geoffboeing.com/2016/11/osmnx-python-street-networks/

Where my trial is :

```
udf_node=fn.udf(lambda x,y:ox.get_nearest_node(G, (x,y), return_dist=True)[0],IntegerType())
udf_node_dist=fn.udf(lambda x,y:ox.get_nearest_node(G, (x,y), return_dist=True)[1],FloatType())
df = df.withColumn('node',udf_node(fn.col('longitude'), fn.col('latitude')))
df = df.withColumn('node_dist',udf_node_dist(fn.col('longitude'), fn.col('latitude')))
```

while calling show function or trying to save the resulting dataframe as parquet I got the following error :

```
Py4JJavaError: An error occurred while calling o976.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 34.0 failed 1 times, most recent failure: Lost task 0.0 in stage 34.0 (TID 104, localhost, executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
...
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
...
```

How can I apply the external library function in pyspark?

**Update1:**

```
def nearest_node_dist(x,y):
return ox.get_nearest_node(G, (x,y), return_dist=True) #Output: (nearest_node,dist)
udf_node=fn.udf(lambda x,nearest_node_dist(x,y)[0],IntegerType())
udf_dist=fn.udf(lambda x,nearest_node_dist(x,y)[1],FloatType())
df = df.withColumn('node',udf_node(fn.col('longitude'), fn.col('latitude')))
df = df.withColumn('dist',udf_dist(fn.col('longitude'), fn.col('latitude')))
```

I also got the same error.

In pandas tits running well for a sample of the dataframe :

where the code as follows:

```
pddfsample=pddf.head() ## pddf is pandas dataframe
def nearest_node(x,y):
nearest_node,dist=ox.get_nearest_node(G, (x,y), return_dist=True)
return nearest_node
def dist_to_Nnode(x,y):
nearest_node,dist=ox.get_nearest_node(G, (x,y), return_dist=True)
return dist
pddf['nearest_node'] = np.vectorize(nearest_node)(pddf['longitude'],pddf['latitude'])
pddf['dist_to_Nnode'] = np.vectorize(dist_to_Nnode)(pddf['longitude'],pddf['latitude'])
```

Although it running well, it took a lot of time for the whole dataframe.

Posted in S.E.F

via

**StackOverflow & StackExchange Atomic Web Robots**

## No comments:

Post a Comment