在工作区级别的所有 Databricks 笔记本中进行字符串搜索
String search in all Databricks notebook in workspace level
用例:是否有可能从一个特定的 Databricks 工作区检查所有笔记本并获取笔记本通过 ADLS 位置而不是元存储直接访问数据的位置的清单。
要捕获的信息:
引用“abfss://”的笔记本名称
每个笔记本中引用的具体文件路径
如果可能,请帮助我。
此功能在 Azure Databricks 上不可用 - 您只能按笔记本或文件夹名称进行搜索。但是您仍然可以通过将笔记本导出到本地磁盘并搜索字符串来做您想做的事。对于导出,您可以使用 Databricks CLI 的 workspace export_dir
命令,如下所示:
databricks workspace export_dir '/Shared/' ~/tmp/databricks-files
最后我们找到了一个正确的solutions.Even通过,workspace export_dir
命令是这个用例的解决方案之一,我们必须将所有笔记本从工作区下载到本地,这不是
由我们的安全团队推荐。因此,或者,我们通过 2.0/workspace/export
rest api databricks notebooks 本身的选项实现了。
步骤:
- 找出所有带有笔记本路径、对象 ID 等笔记本对象的笔记本
- 迭代所有笔记本路径以编码格式获取这些笔记本的内容
- 对这些笔记本详细信息进行解码,并检查各个解码的笔记本详细信息是否包含搜索字符串。
- 根据对象 id 获取笔记本所有者
- 生成最终结果。
参考:https://docs.databricks.com/dev-tools/api/latest/workspace.html#export
完整代码供参考:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import *
from pyspark.sql import Row
import base64
import requests
import json
databricks_instance ="https://databricks_instance.azuredatabricks.net"
url_list = f"{databricks_instance}/api/2.0/workspace/list"
url_export = f"{databricks_instance}/api/2.0/workspace/export"
payload = json.dumps({
"path": "/Users/"
})
headers = {
'Authorization': 'Bearer user_token',
'Content-Type': 'application/json'
}
response = requests.request("GET", url_list, headers=headers, data=payload).json()
notebooks = []
# Getting the all notebooks list for given notebooks.
def list_notebooks(mylist):
for element in mylist['objects']:
if element['object_type'] == 'NOTEBOOK':
notebooks.append(element)
if element['object_type'] == 'DIRECTORY':
payload_inner = json.dumps({
"path": element['path']
})
response_inner = requests.request("GET", url_list, headers=headers, data=payload_inner).json()
if len(response_inner) != 0:
list_notebooks(response_inner)
return notebooks
result = list_notebooks(response)
print(result[0])
# print(current_note_book_id)
#
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = 'Bearer user_token'
return r
# Define the function to get the permission
def get_permission(var_object_id):
url_export1 = f"{databricks_instance}/api/2.0/preview/permissions/notebooks/{var_object_id}"
response = requests.request("GET", url_export1, headers=headers, data=payload).json()
access_control_list= response['access_control_list']
return access_control_list
# Define the function to get the user and permission of the notebooks
def get_control_user_list(access_control_list):
access_control_user_list =[]
for var_idx in access_control_list:
if "user_name" in var_idx:
var_user_name = var_idx['user_name']
if "group_name" in var_idx:
var_user_name = var_idx['group_name']
var_user_permission = var_idx['all_permissions'][0]['permission_level']
access_control_user_list.append(var_user_name)
access_control_user_list.append(var_user_permission)
return access_control_user_list
#get the current notebook path
var_current_notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
# print(var_current_notebook_path)
# Processing the every elment of the notebook objects for given location
var_search_type_oracle ='Oracle'
# Defining the search pattern
var_oracle_search_pattern = 'jdbc:oracle'
#mentioning the target location
var_report_target_location = '/writing/into/target/location'
# Define the function to formatting the final results
def get_format_result(var_notebook_path,var_search_type,var_access_user_control_list):
var_access_user_control_list_str = '-'.join(var_access_user_control_list)
final_format_record = var_notebook_path + "|" + var_search_type + "|" + var_access_user_control_list_str
return final_format_record
# Opening the file for report
f = open(f"{var_report_target_location}/mytarget", 'w')
# Processing the every note book objects
var_total_notebook_count_processed =0
var_total_notebook_count_oracle =0
var_total_notebook_count_teradata =0
var_total_notebook_count_datalakeservice =0
var_total_notebook_count_oauth =0
for notebook_items in result :
var_notebook_path = notebook_items["path"]
var_object_id = notebook_items["object_id"]
var_note_payload = json.dumps({"path": f"{var_notebook_path}"})
var_response = requests.request("GET", url_export, headers=headers, data=var_note_payload).json()
# if "content" in var_response:
# var_response_content=var_response['content']
# else:
# print ("this is not notbook cotain content = " % (var_notebook_path))
# var_response_content = base64.b64encode(b'no data to be encoded')
try:
var_response_content=var_response['content']
except:
# Continue to next iteration as some time note book has some limitation {'error': 'DatabricksServiceException: BAD_REQUEST: content size (15395930) exceeded the limit 10485760'}
continue
# var_response_content_str= base64.b64decode(var_response['content']).decode("utf-8")
var_response_content_str= base64.b64decode(var_response_content).decode("utf-8")
var_total_notebook_count_processed +=1
if var_response_content_str.find(f"{var_other_onprem_search_pattern}") != -1 and var_response_content_str.find("password") != -1 and var_notebook_path !=var_current_notebook_path :
# oracle connection password contain note books
if var_response_content_str.find(f"{var_oracle_search_pattern}") != -1 and var_response_content_str.find("password") != -1 and var_notebook_path !=var_current_notebook_path :
print(var_notebook_path)
var_access_control_list = get_permission(var_object_id)
var_access_user_control_list = get_control_user_list(var_access_control_list)
var_final_format_record = get_format_result(var_notebook_path,var_search_type_oracle,var_access_user_control_list)
print(var_final_format_record)
f.write(var_final_format_record +'\n')
var_total_notebook_count_oracle +=1
我构建了一个网络应用程序,它执行弹性搜索以在您的数据块工作区或存储库中查找片段。我在工作中一直使用它。
用例:是否有可能从一个特定的 Databricks 工作区检查所有笔记本并获取笔记本通过 ADLS 位置而不是元存储直接访问数据的位置的清单。
要捕获的信息:
引用“abfss://”的笔记本名称
每个笔记本中引用的具体文件路径
如果可能,请帮助我。
此功能在 Azure Databricks 上不可用 - 您只能按笔记本或文件夹名称进行搜索。但是您仍然可以通过将笔记本导出到本地磁盘并搜索字符串来做您想做的事。对于导出,您可以使用 Databricks CLI 的 workspace export_dir
命令,如下所示:
databricks workspace export_dir '/Shared/' ~/tmp/databricks-files
最后我们找到了一个正确的solutions.Even通过,workspace export_dir
命令是这个用例的解决方案之一,我们必须将所有笔记本从工作区下载到本地,这不是
由我们的安全团队推荐。因此,或者,我们通过 2.0/workspace/export
rest api databricks notebooks 本身的选项实现了。
步骤:
- 找出所有带有笔记本路径、对象 ID 等笔记本对象的笔记本
- 迭代所有笔记本路径以编码格式获取这些笔记本的内容
- 对这些笔记本详细信息进行解码,并检查各个解码的笔记本详细信息是否包含搜索字符串。
- 根据对象 id 获取笔记本所有者
- 生成最终结果。
参考:https://docs.databricks.com/dev-tools/api/latest/workspace.html#export
完整代码供参考:
from pyspark.sql.types import IntegerType
from pyspark.sql.types import *
from pyspark.sql import Row
import base64
import requests
import json
databricks_instance ="https://databricks_instance.azuredatabricks.net"
url_list = f"{databricks_instance}/api/2.0/workspace/list"
url_export = f"{databricks_instance}/api/2.0/workspace/export"
payload = json.dumps({
"path": "/Users/"
})
headers = {
'Authorization': 'Bearer user_token',
'Content-Type': 'application/json'
}
response = requests.request("GET", url_list, headers=headers, data=payload).json()
notebooks = []
# Getting the all notebooks list for given notebooks.
def list_notebooks(mylist):
for element in mylist['objects']:
if element['object_type'] == 'NOTEBOOK':
notebooks.append(element)
if element['object_type'] == 'DIRECTORY':
payload_inner = json.dumps({
"path": element['path']
})
response_inner = requests.request("GET", url_list, headers=headers, data=payload_inner).json()
if len(response_inner) != 0:
list_notebooks(response_inner)
return notebooks
result = list_notebooks(response)
print(result[0])
# print(current_note_book_id)
#
class BearerAuth(requests.auth.AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, r):
r.headers["authorization"] = 'Bearer user_token'
return r
# Define the function to get the permission
def get_permission(var_object_id):
url_export1 = f"{databricks_instance}/api/2.0/preview/permissions/notebooks/{var_object_id}"
response = requests.request("GET", url_export1, headers=headers, data=payload).json()
access_control_list= response['access_control_list']
return access_control_list
# Define the function to get the user and permission of the notebooks
def get_control_user_list(access_control_list):
access_control_user_list =[]
for var_idx in access_control_list:
if "user_name" in var_idx:
var_user_name = var_idx['user_name']
if "group_name" in var_idx:
var_user_name = var_idx['group_name']
var_user_permission = var_idx['all_permissions'][0]['permission_level']
access_control_user_list.append(var_user_name)
access_control_user_list.append(var_user_permission)
return access_control_user_list
#get the current notebook path
var_current_notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
# print(var_current_notebook_path)
# Processing the every elment of the notebook objects for given location
var_search_type_oracle ='Oracle'
# Defining the search pattern
var_oracle_search_pattern = 'jdbc:oracle'
#mentioning the target location
var_report_target_location = '/writing/into/target/location'
# Define the function to formatting the final results
def get_format_result(var_notebook_path,var_search_type,var_access_user_control_list):
var_access_user_control_list_str = '-'.join(var_access_user_control_list)
final_format_record = var_notebook_path + "|" + var_search_type + "|" + var_access_user_control_list_str
return final_format_record
# Opening the file for report
f = open(f"{var_report_target_location}/mytarget", 'w')
# Processing the every note book objects
var_total_notebook_count_processed =0
var_total_notebook_count_oracle =0
var_total_notebook_count_teradata =0
var_total_notebook_count_datalakeservice =0
var_total_notebook_count_oauth =0
for notebook_items in result :
var_notebook_path = notebook_items["path"]
var_object_id = notebook_items["object_id"]
var_note_payload = json.dumps({"path": f"{var_notebook_path}"})
var_response = requests.request("GET", url_export, headers=headers, data=var_note_payload).json()
# if "content" in var_response:
# var_response_content=var_response['content']
# else:
# print ("this is not notbook cotain content = " % (var_notebook_path))
# var_response_content = base64.b64encode(b'no data to be encoded')
try:
var_response_content=var_response['content']
except:
# Continue to next iteration as some time note book has some limitation {'error': 'DatabricksServiceException: BAD_REQUEST: content size (15395930) exceeded the limit 10485760'}
continue
# var_response_content_str= base64.b64decode(var_response['content']).decode("utf-8")
var_response_content_str= base64.b64decode(var_response_content).decode("utf-8")
var_total_notebook_count_processed +=1
if var_response_content_str.find(f"{var_other_onprem_search_pattern}") != -1 and var_response_content_str.find("password") != -1 and var_notebook_path !=var_current_notebook_path :
# oracle connection password contain note books
if var_response_content_str.find(f"{var_oracle_search_pattern}") != -1 and var_response_content_str.find("password") != -1 and var_notebook_path !=var_current_notebook_path :
print(var_notebook_path)
var_access_control_list = get_permission(var_object_id)
var_access_user_control_list = get_control_user_list(var_access_control_list)
var_final_format_record = get_format_result(var_notebook_path,var_search_type_oracle,var_access_user_control_list)
print(var_final_format_record)
f.write(var_final_format_record +'\n')
var_total_notebook_count_oracle +=1
我构建了一个网络应用程序,它执行弹性搜索以在您的数据块工作区或存储库中查找片段。我在工作中一直使用它。