Prefiero usar el marco de datos nativo de Spark, porque me permite una mayor personalización. Puedo usar stringtype propiedad para convertir el campo json del marco de datos al campo jsonb en la tabla. Para este caso, mi marco de datos tiene dos campos.
from pyspark import SparkConf
sc = SparkContext.getOrCreate(SparkConf())
spark = SparkSession(sc)
df = spark.read.format('csv') \
.option('delimiter','|') \
.option('header','True') \
.load('your_path')
##some transformation...
url = 'jdbc:postgresql://your_host:5432/your_databasename'
properties = {'user':'*****',
'password':'*****',
'driver': "org.postgresql.Driver",
'stringtype':"unspecified"}
df.write.jdbc(url=url, table='your_tablename', mode='append', properties=properties)
Antes de ejecutar el script anterior, debe crear la tabla en postgresql, porque la propiedad modo está configurado como append . Esto de la siguiente manera:
create table your_tablename
(
my_json_field jsonb,
another_field int
)